Merge branch 'gerrit/ionic' to 'gerrit/master'

Change-Id: If0793d8dcd2caf64b2f1f9676157f1fc88f97c8e
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 92436c7..18caaa9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.optimizer.rules.pushdown.processor;
 
 import static org.apache.asterix.metadata.utils.PushdownUtil.getConstant;
+import static org.apache.asterix.metadata.utils.PushdownUtil.getTypeEnv;
 import static org.apache.asterix.metadata.utils.PushdownUtil.isAnd;
 import static org.apache.asterix.metadata.utils.PushdownUtil.isCompare;
 import static org.apache.asterix.metadata.utils.PushdownUtil.isConstant;
@@ -36,6 +37,9 @@
 import java.util.Set;
 
 import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
@@ -122,11 +126,12 @@
     /**
      * Handle a compare function
      *
-     * @param expression compare expression
+     * @param expression        compare expression
+     * @param currentDescriptor
      * @return true if the pushdown should continue, false otherwise
      */
-    protected abstract boolean handleCompare(AbstractFunctionCallExpression expression, int depth)
-            throws AlgebricksException;
+    protected abstract FilterBranch handleCompare(AbstractFunctionCallExpression expression, int depth,
+            UseDescriptor currentDescriptor) throws AlgebricksException;
 
     /**
      * Handle a value access path expression
@@ -134,10 +139,10 @@
      * @param expression path expression
      * @return true if the pushdown should continue, false otherwise
      */
-    protected final boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+    protected final FilterBranch handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
         IExpectedSchemaNode node = getPathNode(expression);
         if (node == null) {
-            return false;
+            return FilterBranch.NA;
         }
         return handlePath(expression, node);
     }
@@ -149,7 +154,7 @@
      * @param node       expected schema node (never null)
      * @return true if the pushdown should continue, false otherwise
      */
-    protected abstract boolean handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
+    protected abstract FilterBranch handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
             throws AlgebricksException;
 
     protected abstract IExpectedSchemaNode getPathNode(AbstractFunctionCallExpression expression)
@@ -285,7 +290,7 @@
 
         // Prepare for pushdown
         preparePushdown(useDescriptor, scanDefineDescriptor);
-        if (pushdownFilterExpression(inlinedExpr, 0)) {
+        if (pushdownFilterExpression(inlinedExpr, useDescriptor, 0) != FilterBranch.NA) {
             putFilterInformation(scanDefineDescriptor, inlinedExpr);
             changed = true;
         }
@@ -293,57 +298,88 @@
         return changed;
     }
 
-    protected final boolean pushdownFilterExpression(ILogicalExpression expression, int depth)
-            throws AlgebricksException {
-        boolean pushdown = false;
+    public enum FilterBranch {
+        CONSTANT,
+        AND,
+        COMPARE,
+        FILTER_PATH,
+        FUNCTION,
+        NA;
+
+        public static FilterBranch andOutput(FilterBranch leftBranch, FilterBranch rightBranch,
+                FilterBranch parentBranch) {
+            if (leftBranch == FilterBranch.NA || rightBranch == FilterBranch.NA) {
+                return FilterBranch.NA;
+            }
+            return parentBranch;
+        }
+    };
+
+    protected final FilterBranch pushdownFilterExpression(ILogicalExpression expression, UseDescriptor useDescriptor,
+            int depth) throws AlgebricksException {
         if (isConstant(expression)) {
             IAObject constantValue = getConstant(expression);
             // Only non-derived types are allowed
-            pushdown = !constantValue.getType().getTypeTag().isDerivedType();
+            if (!constantValue.getType().getTypeTag().isDerivedType()) {
+                return FilterBranch.CONSTANT;
+            }
+            return FilterBranch.NA;
         } else if (isAnd(expression)) {
-            pushdown = handleAnd((AbstractFunctionCallExpression) expression, depth);
+            return handleAnd((AbstractFunctionCallExpression) expression, depth, useDescriptor);
         } else if (isCompare(expression)) {
-            pushdown = handleCompare((AbstractFunctionCallExpression) expression, depth);
+            return handleCompare((AbstractFunctionCallExpression) expression, depth, useDescriptor);
         } else if (isFilterPath(expression)) {
-            pushdown = handlePath((AbstractFunctionCallExpression) expression);
+            return handlePath((AbstractFunctionCallExpression) expression);
         } else if (expression.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
             // All functions including OR
-            pushdown = handleFunction((AbstractFunctionCallExpression) expression, depth);
+            return handleFunction((AbstractFunctionCallExpression) expression, depth, useDescriptor);
         }
         // PK variable should have (pushdown = false) as we should not involve the PK (at least currently)
-        return pushdown;
+        return FilterBranch.NA;
     }
 
-    private boolean handleAnd(AbstractFunctionCallExpression expression, int depth) throws AlgebricksException {
+    private FilterBranch handleAnd(AbstractFunctionCallExpression expression, int depth, UseDescriptor useDescriptor)
+            throws AlgebricksException {
         List<Mutable<ILogicalExpression>> args = expression.getArguments();
         Iterator<Mutable<ILogicalExpression>> argIter = args.iterator();
         while (argIter.hasNext()) {
             ILogicalExpression arg = argIter.next().getValue();
             // Allow for partial pushdown of AND operands
-            if (!pushdownFilterExpression(arg, depth + 1)) {
+            if (pushdownFilterExpression(arg, useDescriptor, depth + 1) == FilterBranch.NA) {
                 if (depth == 0) {
                     // Remove the expression that cannot be pushed down
                     argIter.remove();
                 } else {
-                    return false;
+                    return FilterBranch.NA;
                 }
             }
         }
-        return !args.isEmpty();
+        return !args.isEmpty() ? FilterBranch.AND : FilterBranch.NA;
     }
 
-    private boolean handleFunction(AbstractFunctionCallExpression expression, int depth) throws AlgebricksException {
+    protected boolean expressionReturnsArray(ILogicalExpression expression, ILogicalOperator operator)
+            throws AlgebricksException {
+        IAType expressionType = (IAType) context.getExpressionTypeComputer().getType(expression,
+                context.getMetadataProvider(), getTypeEnv(operator, context));
+        if (ATypeTag.UNION == expressionType.getTypeTag()) {
+            expressionType = ((AUnionType) expressionType).getActualType();
+        }
+        return ATypeTag.ARRAY == expressionType.getTypeTag() || ATypeTag.ANY == expressionType.getTypeTag();
+    }
+
+    private FilterBranch handleFunction(AbstractFunctionCallExpression expression, int depth,
+            UseDescriptor useDescriptor) throws AlgebricksException {
         if (!expression.getFunctionInfo().isFunctional() || isNotPushable(expression)) {
-            return false;
+            return FilterBranch.NA;
         }
 
         for (Mutable<ILogicalExpression> argRef : expression.getArguments()) {
             ILogicalExpression arg = argRef.getValue();
             // Either all arguments are pushable or none
-            if (!pushdownFilterExpression(arg, depth + 1)) {
-                return false;
+            if (pushdownFilterExpression(arg, useDescriptor, depth + 1) == FilterBranch.NA) {
+                return FilterBranch.NA;
             }
         }
-        return true;
+        return FilterBranch.FUNCTION;
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 497e751..445da7e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -104,7 +104,8 @@
     }
 
     @Override
-    protected boolean handleCompare(AbstractFunctionCallExpression expression, int depth) throws AlgebricksException {
+    protected FilterBranch handleCompare(AbstractFunctionCallExpression expression, int depth,
+            UseDescriptor currentDescriptor) throws AlgebricksException {
         List<Mutable<ILogicalExpression>> args = expression.getArguments();
 
         Mutable<ILogicalExpression> leftRef = args.get(0);
@@ -113,17 +114,38 @@
         ILogicalExpression left = leftRef.getValue();
         ILogicalExpression right = rightRef.getValue();
 
-        return pushdownFilterExpression(left, depth + 1) && pushdownFilterExpression(right, depth + 1);
+        //If the left or right is handlePath (like getField), then the right or left shouldn't be an array
+        FilterBranch leftBranch = pushdownFilterExpression(left, currentDescriptor, depth + 1);
+        FilterBranch rightBranch = pushdownFilterExpression(right, currentDescriptor, depth + 1);
+
+        FilterBranch result = FilterBranch.andOutput(leftBranch, rightBranch, FilterBranch.COMPARE);
+        if (result == FilterBranch.NA) {
+            //If the result is NA, then we cannot push down the filter
+            return FilterBranch.NA;
+        }
+
+        boolean pushdown = true;
+        //If the value is a filterPath, means it is coming from the expression tree.
+        if (leftBranch == FilterBranch.FILTER_PATH && rightBranch == FilterBranch.FILTER_PATH) {
+            return FilterBranch.COMPARE;
+        } else if (leftBranch == FilterBranch.FILTER_PATH) {
+            // if the expression return type is an array or any, we cannot push it down
+            pushdown = !expressionReturnsArray(right, currentDescriptor.getOperator());
+        } else if (rightBranch == FilterBranch.FILTER_PATH) {
+            pushdown = !expressionReturnsArray(left, currentDescriptor.getOperator());
+        }
+
+        return pushdown ? result : FilterBranch.NA;
     }
 
     @Override
-    protected boolean handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
+    protected FilterBranch handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
             throws AlgebricksException {
         if (node.getType() != ExpectedSchemaNodeType.ANY) {
-            return false;
+            return FilterBranch.NA;
         }
         paths.put(expression, pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node));
-        return true;
+        return FilterBranch.FILTER_PATH;
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java
index 030fb6e..04a9f6d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java
@@ -83,7 +83,8 @@
     }
 
     @Override
-    protected boolean handleCompare(AbstractFunctionCallExpression expression, int depth) throws AlgebricksException {
+    protected FilterBranch handleCompare(AbstractFunctionCallExpression expression, int depth,
+            UseDescriptor currentDescriptor) throws AlgebricksException {
         List<Mutable<ILogicalExpression>> args = expression.getArguments();
 
         Mutable<ILogicalExpression> leftRef = args.get(0);
@@ -98,15 +99,15 @@
             return pushdownRangeFilter(left, right, expression, false);
         }
         // Either it is a compare that doesn't involve a constant there's a function that wraps the value access path
-        return false;
+        return FilterBranch.NA;
     }
 
     @Override
-    protected boolean handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
+    protected FilterBranch handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
             throws AlgebricksException {
         // This means we got something like WHERE $r.getField("isVerified") -- where isVerified is a boolean field.
         if (node.getType() != ExpectedSchemaNodeType.ANY) {
-            return false;
+            return FilterBranch.NA;
         }
         IAObject constantValue = ABoolean.TRUE;
         String functionName = expression.getFunctionIdentifier().getName();
@@ -116,7 +117,7 @@
         ARecordType path = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node, constantValue.getType(),
                 sourceInformationMap, functionCallInfo);
         paths.put(expression, path);
-        return true;
+        return FilterBranch.FILTER_PATH;
     }
 
     @Override
@@ -132,12 +133,12 @@
         scanDefineDescriptor.getPathLocations().putAll(sourceInformationMap);
     }
 
-    private boolean pushdownRangeFilter(ILogicalExpression pathExpr, ILogicalExpression constExpr,
+    private FilterBranch pushdownRangeFilter(ILogicalExpression pathExpr, ILogicalExpression constExpr,
             AbstractFunctionCallExpression funcExpr, boolean leftConstant) throws AlgebricksException {
         AnyExpectedSchemaNode node = getNode(pathExpr);
         IAObject constantValue = ((AsterixConstantValue) ((ConstantExpression) constExpr).getValue()).getObject();
         if (node == null || !SUPPORTED_CONSTANT_TYPES.contains(constantValue.getType().getTypeTag())) {
-            return false;
+            return FilterBranch.NA;
         }
         String functionName = funcExpr.getFunctionIdentifier().getName();
         SourceLocation sourceLocation = funcExpr.getSourceLocation();
@@ -146,7 +147,7 @@
         ARecordType path =
                 pathBuilderVisitor.buildPath(node, constantValue.getType(), sourceInformationMap, functionCallInfo);
         paths.put(pathExpr, path);
-        return true;
+        return FilterBranch.COMPARE;
     }
 
     private AnyExpectedSchemaNode getNode(ILogicalExpression expression) throws AlgebricksException {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
index 38dfde8..a5382c0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
@@ -50,15 +50,15 @@
     }
 
     @Override
-    protected boolean handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
+    protected FilterBranch handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
             throws AlgebricksException {
         if (node.getType() != ExpectedSchemaNodeType.ANY) {
-            return false;
+            return FilterBranch.NA;
         }
 
         // The inferred path from the provided expression
         ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
         paths.put(expression, expressionPath);
-        return true;
+        return FilterBranch.FILTER_PATH;
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
index 37a0128..50180c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
@@ -73,10 +73,10 @@
     }
 
     @Override
-    protected boolean handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
+    protected FilterBranch handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
             throws AlgebricksException {
         if (node.getType() != ExpectedSchemaNodeType.ANY) {
-            return false;
+            return FilterBranch.NA;
         }
 
         // The inferred path from the provided expression
@@ -84,8 +84,8 @@
         if (prefix.getPaths().contains(expressionPath)) {
             // The expression refer to a declared computed field. Add it to the filter paths
             paths.put(expression, expressionPath);
-            return true;
+            return FilterBranch.FILTER_PATH;
         }
-        return false;
+        return FilterBranch.NA;
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
index 6545964..f6c94c6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
@@ -50,16 +50,16 @@
     }
 
     @Override
-    protected boolean handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
+    protected FilterBranch handlePath(AbstractFunctionCallExpression expression, IExpectedSchemaNode node)
             throws AlgebricksException {
         if (node.getType() != ExpectedSchemaNodeType.ANY) {
-            return false;
+            return FilterBranch.NA;
         }
 
         // The inferred path from the provided expression
         ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
         paths.put(expression, expressionPath);
-        return true;
+        return FilterBranch.FILTER_PATH;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index a431f68..d2301a3 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -559,7 +559,7 @@
       <id>asterix-gerrit-asterix-app</id>
       <properties>
         <test.excludes>
-          **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+          **/CloudStorageTest.java,**/CloudStorageSparseTest,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
           **/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           **/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           **/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -680,6 +680,7 @@
       <properties>
         <test.includes>
           **/CloudStorageTest.java,
+          **/CloudStorageSparseTest.java,
           **/CloudStorageCancellationTest.java,
           **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
         </test.includes>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
new file mode 100644
index 0000000..b7a264b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageSparseTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageSparseTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final TestCaseContext tcCtx;
+    public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage-sparse.conf";
+    public static final String DELTA_RESULT_PATH = "results_cloud";
+    public static final String EXCLUDED_TESTS = "MP";
+
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    public static final String MOCK_SERVER_REGION = "us-west-2";
+    public static final int MOCK_SERVER_PORT = 8001;
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+
+    public CloudStorageSparseTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        setupEnv(testExecutor);
+    }
+
+    public static void setupEnv(TestExecutor testExecutor) throws Exception {
+        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
+        testExecutor.executorId = "cloud";
+        testExecutor.stripSubstring = "//DB:";
+        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+
+        // create the playground bucket and leave it empty, just for external collection-based tests
+        S3ClientBuilder builder = S3Client.builder();
+        URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server
+        builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+                .endpointOverride(endpoint);
+        S3Client client = builder.build();
+        client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+        client.close();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+        LocalCloudUtilAdobeMock.shutdownSilently();
+    }
+
+    @Parameters(name = "CloudStorageSparseTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) {
+            bufferCache = ((INcApplicationContext) nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
index d0823c7..6f19393 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppSinglePartitionExecutionTest.java
@@ -28,7 +28,10 @@
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,6 +71,11 @@
     @Test
     public void test() throws Exception {
         LangExecutionUtil.test(tcCtx);
+        IBufferCache bufferCache;
+        for (NodeControllerService nc : ExecutionTestUtil.integrationUtil.ncs) {
+            bufferCache = ((INcApplicationContext) nc.getApplicationContext()).getBufferCache();
+            Assert.assertTrue(((BufferCache) bufferCache).isClean());
+        }
     }
 
     private static void setNcEndpoints(TestExecutor testExecutor) {
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
new file mode 100644
index 0000000..0842e01
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-sparse.conf
@@ -0,0 +1,77 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+credential.file=src/test/resources/security/passwd
+python.cmd.autolocate=true
+python.env=FOO=BAR=BAZ,BAR=BAZ
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m --add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+storage.max.columns.in.zeroth.segment=800
+storage.page.zero.writer=sparse
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+credential.file=src/test/resources/security/passwd
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.ordered.fields=false
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
+storage.partitioning=static
+cloud.storage.scheme=s3
+cloud.storage.bucket=cloud-storage-container
+cloud.storage.region=us-west-2
+cloud.storage.endpoint=http://127.0.0.1:8001
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=selective
+cloud.max.write.requests.per.second=2000
+cloud.max.read.requests.per.second=4000
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 551d2c4..b3d1e2d 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -42,6 +42,7 @@
 storage.buffercache.size=128MB
 storage.memorycomponent.globalbudget=512MB
 storage.max.columns.in.zeroth.segment=800
+storage.page.zero.writer=default
 
 [cc]
 address = 127.0.0.1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.001.ddl.sqlpp
new file mode 100644
index 0000000..d327ee4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.001.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE COLLECTION ColumnDataset PRIMARY KEY (id: String)
+WITH {
+    "storage-format": {
+        "format": "column"
+     }
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.002.update.sqlpp
new file mode 100644
index 0000000..2c83e38
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.002.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ColumnDataset ({
+     "id": "1",
+     "homo_array_field": [1, 2, 3],
+     "hetero_array_field": [1, "a", true],
+     "nested_field": {
+        "a": 1,
+        "array_field": [1, "a", true]
+     }
+});
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.003.query.sqlpp
new file mode 100644
index 0000000..5fc3361
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.003.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT * FROM ColumnDataset
+WHERE homo_array_field = [1, 2, 3];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.004.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.004.query.sqlpp
new file mode 100644
index 0000000..918de5e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.004.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT * FROM ColumnDataset
+WHERE hetero_array_field = [1, "a", true];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.005.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.005.query.sqlpp
new file mode 100644
index 0000000..33834dc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.005.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `import-private-functions` `true`;
+
+SELECT * FROM ColumnDataset
+WHERE hetero_array_field = `ordered-list-constructor`(1, "a", true);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.006.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.006.query.sqlpp
new file mode 100644
index 0000000..6e2c9e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.006.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT * FROM ColumnDataset
+WHERE hetero_array_field = to_array([1, "a", true]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.007.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.007.query.sqlpp
new file mode 100644
index 0000000..1a717a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.007.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT * FROM ColumnDataset c
+WHERE c.n.x + 1 = 13;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.008.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.008.query.sqlpp
new file mode 100644
index 0000000..743069a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.008.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `import-private-functions` `true`;
+
+EXPLAIN
+SELECT * FROM ColumnDataset c
+WHERE `field-access-by-name`(`field-access-by-name`(c, "nested_field"), "array_field") = [1, "a", true];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.009.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.009.query.sqlpp
new file mode 100644
index 0000000..2f2e6ad
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/return-array/return-array.009.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `import-private-functions` `true`;
+
+EXPLAIN
+SELECT * FROM ColumnDataset c
+WHERE `field-access-by-name`(`field-access-by-name`(c, "nested_field"), "a") = 1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.1.ddl.sqlpp
new file mode 100644
index 0000000..8fe4194
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.1.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE TestDataverse IF EXISTS;
+CREATE DATAVERSE TestDataverse;
+USE TestDataverse;
+
+CREATE TYPE col1Type AS { 
+id:int
+};
+
+CREATE TYPE col2Type AS {
+id:int,
+f:int
+}; 
+
+CREATE DATASET col1(col1Type) PRIMARY KEY id;
+
+CREATE DATASET col2(col2Type) PRIMARY KEY id;
+
+CREATE INDEX idx_id ON col2(f);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.2.update.sqlpp
new file mode 100644
index 0000000..90a6b2a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.2.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE TestDataverse;
+
+UPSERT INTO col1 {"id": 1, "f": 8};
+UPSERT INTO col1 {"id": 2, "f": 100};
+UPSERT INTO col2 {"id": 1, "f": 8};
+UPSERT INTO col2 {"id": 2, "f": 33};
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.3.query.sqlpp
new file mode 100644
index 0000000..b746539
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE TestDataverse;
+
+FROM col1 LEFT JOIN col2 ON col1.f /*+ indexnl */ = col2.f
+SELECT col1, col2
+ORDER BY col1.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.4.query.sqlpp
new file mode 100644
index 0000000..8df0469
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.4.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE TestDataverse;
+
+FROM col1 LEFT JOIN col2 ON col1.f /*+ indexnl */ > col2.f
+SELECT col1, col2
+ORDER BY col1.id,col2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.5.query.sqlpp
new file mode 100644
index 0000000..c574b0f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.5.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE TestDataverse;
+
+FROM col1 LEFT JOIN col2 ON col1.f /*+ indexnl */< col2.f
+SELECT col1, col2
+ORDER BY col1.id,col2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.003.adm
new file mode 100644
index 0000000..03721c7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.003.adm
@@ -0,0 +1 @@
+{ "ColumnDataset": { "id": "1", "homo_array_field": [ 1, 2, 3 ], "hetero_array_field": [ 1, "a", true ], "nested_field": { "a": 1, "array_field": [ 1, "a", true ] } } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.004.adm
new file mode 100644
index 0000000..03721c7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.004.adm
@@ -0,0 +1 @@
+{ "ColumnDataset": { "id": "1", "homo_array_field": [ 1, 2, 3 ], "hetero_array_field": [ 1, "a", true ], "nested_field": { "a": 1, "array_field": [ 1, "a", true ] } } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.005.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.005.adm
new file mode 100644
index 0000000..03721c7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.005.adm
@@ -0,0 +1 @@
+{ "ColumnDataset": { "id": "1", "homo_array_field": [ 1, 2, 3 ], "hetero_array_field": [ 1, "a", true ], "nested_field": { "a": 1, "array_field": [ 1, "a", true ] } } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.006.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.006.adm
new file mode 100644
index 0000000..03721c7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.006.adm
@@ -0,0 +1 @@
+{ "ColumnDataset": { "id": "1", "homo_array_field": [ 1, 2, 3 ], "hetero_array_field": [ 1, "a", true ], "nested_field": { "a": 1, "array_field": [ 1, "a", true ] } } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.007.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.007.adm
new file mode 100644
index 0000000..bb24b8a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.007.adm
@@ -0,0 +1,18 @@
+distribute result [$$18] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$18] <- [{"c": $$c}] project: [$$18] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- ASSIGN  |PARTITIONED|
+      select (eq(numeric-add($$c.getField("n").getField("x"), 1), 13)) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- STREAM_SELECT  |PARTITIONED|
+        project ([$$c]) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            data-scan []<-[$$19, $$c] <- test.ColumnDataset filter on: eq(numeric-add($$c.getField("n").getField("x"), 1), 13) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.008.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.008.adm
new file mode 100644
index 0000000..1935885
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.008.adm
@@ -0,0 +1,18 @@
+distribute result [$$18] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$18] <- [{"c": $$c}] project: [$$18] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- ASSIGN  |PARTITIONED|
+      select (eq($$c.getField("nested_field").getField("array_field"), ordered-list-constructor(1, "a", true))) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- STREAM_SELECT  |PARTITIONED|
+        project ([$$c]) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            data-scan []<-[$$19, $$c] <- test.ColumnDataset [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.009.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.009.adm
new file mode 100644
index 0000000..9b20402
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/return-array/return-array.009.adm
@@ -0,0 +1,18 @@
+distribute result [$$17] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$17] <- [{"c": $$c}] project: [$$17] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- ASSIGN  |PARTITIONED|
+      select (eq($$c.getField("nested_field").getField("a"), 1)) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- STREAM_SELECT  |PARTITIONED|
+        project ([$$c]) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            data-scan []<-[$$18, $$c] <- test.ColumnDataset filter on: eq($$c.getField("nested_field").getField("a"), 1) range-filter on: eq($$c.getField("nested_field").getField("a"), 1) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.1.adm
new file mode 100644
index 0000000..cb3a9d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.1.adm
@@ -0,0 +1,2 @@
+{ "col1": { "id": 1, "f": 8 }, "col2": { "id": 1, "f": 8 } }
+{ "col1": { "id": 2, "f": 100 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.2.adm
new file mode 100644
index 0000000..8e7d14d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.2.adm
@@ -0,0 +1,3 @@
+{ "col1": { "id": 1, "f": 8 } }
+{ "col1": { "id": 2, "f": 100 }, "col2": { "id": 1, "f": 8 } }
+{ "col1": { "id": 2, "f": 100 }, "col2": { "id": 2, "f": 33 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.3.adm
new file mode 100644
index 0000000..ece0d0d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-leftouterjoin/index-leftouterjoin-example/index-leftouterjoin-example.3.adm
@@ -0,0 +1,2 @@
+{ "col1": { "id": 1, "f": 8 }, "col2": { "id": 2, "f": 33 } }
+{ "col1": { "id": 2, "f": 100 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.007.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.007.adm
new file mode 100644
index 0000000..4c09511
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.007.adm
@@ -0,0 +1,18 @@
+distribute result [$$18] [cardinality: 0.0, op-cost: 0.0, total-cost: 1.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 1.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$18] <- [{"c": $$c}] project: [$$18] [cardinality: 0.0, op-cost: 0.0, total-cost: 1.0]
+    -- ASSIGN  |PARTITIONED|
+      select (eq(numeric-add($$c.getField("n").getField("x"), 1), 13)) [cardinality: 0.0, op-cost: 0.0, total-cost: 1.0]
+      -- STREAM_SELECT  |PARTITIONED|
+        project ([$$c]) [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            data-scan []<-[$$19, $$c] <- test.ColumnDataset filter on: eq(numeric-add($$c.getField("n").getField("x"), 1), 13) [cardinality: 1.0, op-cost: 1.0, total-cost: 1.0]
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.008.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.008.adm
new file mode 100644
index 0000000..4f825c4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.008.adm
@@ -0,0 +1,18 @@
+distribute result [$$18] [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$18] <- [{"c": $$c}] project: [$$18] [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+    -- ASSIGN  |PARTITIONED|
+      select (eq($$c.getField("nested_field").getField("array_field"), ordered-list-constructor(1, "a", true))) [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+      -- STREAM_SELECT  |PARTITIONED|
+        project ([$$c]) [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            data-scan []<-[$$19, $$c] <- test.ColumnDataset [cardinality: 1.0, op-cost: 1.0, total-cost: 1.0]
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.009.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.009.adm
new file mode 100644
index 0000000..7b0fd44
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/return-array/return-array.009.adm
@@ -0,0 +1,18 @@
+distribute result [$$17] [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$17] <- [{"c": $$c}] project: [$$17] [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+    -- ASSIGN  |PARTITIONED|
+      select (eq($$c.getField("nested_field").getField("a"), 1)) [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+      -- STREAM_SELECT  |PARTITIONED|
+        project ([$$c]) [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 1.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            data-scan []<-[$$18, $$c] <- test.ColumnDataset filter on: eq($$c.getField("nested_field").getField("a"), 1) range-filter on: eq($$c.getField("nested_field").getField("a"), 1) [cardinality: 1.0, op-cost: 1.0, total-cost: 1.0]
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 4ae1ea6..355019a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -14727,6 +14727,11 @@
   </test-group>
   <test-group name="index-leftouterjoin">
     <test-case FilePath="index-leftouterjoin">
+      <compilation-unit name="index-leftouterjoin-example">
+        <output-dir compare="Text">index-leftouterjoin-example</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="index-leftouterjoin">
       <compilation-unit name="probe-pidx-with-join-btree-sidx1">
         <output-dir compare="Text">probe-pidx-with-join-btree-sidx1</output-dir>
       </compilation-unit>
@@ -16624,6 +16629,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="filter/return-array">
+        <output-dir compare="Text">filter/return-array</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="delete/001">
         <output-dir compare="Text">delete/001</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml
index 5da5cea..4a54ce9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml
@@ -105,6 +105,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="filter/return-array">
+        <output-dir compare="Text">filter/return-array</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="io/flush/ASTERIXDB-3597">
         <output-dir compare="Text">io/flush/ASTERIXDB-3597</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
index bb02166..1ab3052 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiPageZeroByteBuffersReader.java
@@ -34,9 +34,12 @@
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 
 public final class MultiPageZeroByteBuffersReader {
     private static final ByteBuffer EMPTY;
+    private final IntList notRequiredSegmentsIndexes;
     private ColumnMultiPageZeroBufferProvider bufferProvider;
     private final Int2IntMap segmentDir; // should I just create a buffer[numberOfSegments] instead?
     private int maxBuffersSize;
@@ -51,15 +54,15 @@
     public MultiPageZeroByteBuffersReader() {
         this.buffers = new ArrayList<>();
         segmentDir = new Int2IntOpenHashMap();
+        notRequiredSegmentsIndexes = new IntArrayList();
         segmentDir.defaultReturnValue(-1);
     }
 
-    public void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException {
-        ColumnMultiPageZeroBufferProvider pageZeroBufferProvider = (ColumnMultiPageZeroBufferProvider) bufferProvider;
+    public void reset(IColumnBufferProvider pageZeroBufferProvider) throws HyracksDataException {
         reset();
-        this.bufferProvider = pageZeroBufferProvider;
-        maxBuffersSize = pageZeroBufferProvider.getNumberOfRemainingPages();
-        pageZeroBufferProvider.readAll(buffers, segmentDir);
+        this.bufferProvider = (ColumnMultiPageZeroBufferProvider) pageZeroBufferProvider;
+        maxBuffersSize = bufferProvider.getNumberOfRemainingPages();
+        bufferProvider.readAll(buffers, segmentDir);
     }
 
     public void read(int segmentIndex, IPointable pointable, int position, int length)
@@ -82,7 +85,8 @@
         pointable.set(buffer.array(), position, length);
     }
 
-    public void readOffset(long[] offsetColumnIndexPairs, int maxColumnsInZerothSegment, int numberOfColumnsInAPage) {
+    public int readOffset(long[] offsetColumnIndexPairs, int maxColumnsInZerothSegment, int numberOfColumnsInAPage,
+            int currentColumnIndex) {
         int numberOfColumns = offsetColumnIndexPairs.length - 1;
         for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
             int segmentIndex = pair.getIntKey();
@@ -92,18 +96,20 @@
             int segmentOffset = 0;
             for (int j = 0; j < numberOfColumnsInAPage; j++) {
                 int columnOffset = buffer.getInt(segmentOffset);
-                offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+                offsetColumnIndexPairs[currentColumnIndex] = IntPairUtil.of(columnOffset, columnIndex);
                 segmentOffset += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+                currentColumnIndex++;
                 columnIndex++;
                 if (columnIndex == numberOfColumns) {
                     break; // No need to read more columns from this buffer.
                 }
             }
         }
+        return currentColumnIndex;
     }
 
-    public void readSparseOffset(long[] offsetColumnIndexPairs, int numberOfPageSegments, int numberOfColumnsInAPage,
-            int numberOfColumnsInLastSegment) {
+    public int readSparseOffset(long[] offsetColumnIndexPairs, int numberOfPageSegments, int numberOfColumnsInAPage,
+            int numberOfColumnsInLastSegment, int currentColumnIndex) {
         for (Int2IntMap.Entry pair : segmentDir.int2IntEntrySet()) {
             int segmentIndex = pair.getIntKey();
             int bufferIndex = pair.getIntValue();
@@ -114,10 +120,11 @@
             for (int j = 0; j < numberOfColumnsInSegment; j++) {
                 int columnIndex = buffer.getInt(segmentOffset);
                 int columnOffset = buffer.getInt(segmentOffset + Integer.BYTES);
-                offsetColumnIndexPairs[columnIndex] = IntPairUtil.of(columnOffset, columnIndex);
+                offsetColumnIndexPairs[currentColumnIndex++] = IntPairUtil.of(columnOffset, columnIndex);
                 segmentOffset += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
             }
         }
+        return currentColumnIndex;
     }
 
     public void readAllColumns(BitSet presentColumns, int numberOfPageSegments, int numberOfColumnsInAPage,
@@ -143,6 +150,32 @@
         }
     }
 
+    public void unPinNotRequiredSegments(BitSet pageZeroSegmentsPages, int numberOfPageZeroSegments)
+            throws HyracksDataException {
+        if (numberOfPageZeroSegments <= 1) {
+            // If there is only one segment, it is always pinned.
+            // So no need to unpin the segments.
+            return;
+        }
+        notRequiredSegmentsIndexes.clear();
+        // Start checking from index 1 (0th segment is always pinned)
+        int i = pageZeroSegmentsPages.nextClearBit(1);
+        while (i >= 1 && i < numberOfPageZeroSegments) {
+            int segmentIndex = i - 1; // Adjusted index for segmentDir
+
+            int bufferIndex = segmentDir.get(segmentIndex);
+            if (bufferIndex != -1) {
+                buffers.set(bufferIndex, EMPTY);
+                notRequiredSegmentsIndexes.add(bufferIndex);
+                segmentDir.remove(segmentIndex);
+            }
+
+            i = pageZeroSegmentsPages.nextClearBit(i + 1);
+        }
+        // Unpin the buffers that are not required anymore.
+        bufferProvider.releasePages(notRequiredSegmentsIndexes);
+    }
+
     public int findColumnIndexInSegment(int segmentIndex, int columnIndex, int numberOfColumnsInSegment)
             throws HyracksDataException {
         if (segmentIndex < 0 || segmentIndex >= maxBuffersSize) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index a1fcfe3..bf76a6b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -145,8 +145,8 @@
     }
 
     @Override
-    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
-            IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
+    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, int bufferCapacity,
+            boolean includeCurrentTupleColumns, IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
         int spaceOccupiedByDefaultWriter;
         int spaceOccupiedBySparseWriter;
 
@@ -157,13 +157,13 @@
             return spaceOccupiedByDefaultWriter;
         } else if (writerType == IColumnPageZeroWriter.ColumnPageZeroWriterType.SPARSE) {
             // Maximum space occupied by the columns = maxColumnsInPageZerothSegment * (offset + filter size)
-            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
             return spaceOccupiedBySparseWriter;
         }
 
         spaceOccupiedByDefaultWriter =
                 getSpaceOccupiedByDefaultWriter(maxColumnsInPageZerothSegment, includeCurrentTupleColumns);
-        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
         pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
                 spaceOccupiedBySparseWriter);
 
@@ -179,11 +179,14 @@
         return spaceOccupiedByDefaultWriter;
     }
 
-    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment, int bufferCapacity) {
         int presentColumns = transformerForCurrentTuple.getNumberOfVisitedColumnsInBatch();
-        int numberOfPagesRequired = (int) Math.ceil(
-                (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
-        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+        int maximumNumberOfColumnsInASegment =
+                SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
+        int numberOfExtraPagesRequired = presentColumns <= maxColumnsInPageZerothSegment ? 0
+                : (int) Math.ceil(
+                        (double) (presentColumns - maxColumnsInPageZerothSegment) / maximumNumberOfColumnsInASegment);
+        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfExtraPagesRequired);
         presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
 
         // space occupied by the sparse writer
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index d31e1d3..fb5cfdb 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -155,8 +155,8 @@
     }
 
     @Override
-    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, boolean includeCurrentTupleColumns,
-            IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
+    public int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, int bufferCapacity,
+            boolean includeCurrentTupleColumns, IColumnPageZeroWriter.ColumnPageZeroWriterType writerType) {
         int spaceOccupiedByDefaultWriter;
         int spaceOccupiedBySparseWriter;
 
@@ -167,11 +167,11 @@
             return spaceOccupiedByDefaultWriter;
         } else if (writerType == IColumnPageZeroWriter.ColumnPageZeroWriterType.SPARSE) {
             // Maximum space occupied by the columns = maxColumnsInPageZerothSegment * (offset + filter size)
-            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+            spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
             return spaceOccupiedBySparseWriter;
         }
 
-        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment);
+        spaceOccupiedBySparseWriter = getSpaceOccupiedBySparseWriter(maxColumnsInPageZerothSegment, bufferCapacity);
         spaceOccupiedByDefaultWriter =
                 getSpaceOccupiedByDefaultWriter(maxColumnsInPageZerothSegment, includeCurrentTupleColumns);
         pageZeroWriterFlavorSelector.switchPageZeroWriterIfNeeded(spaceOccupiedByDefaultWriter,
@@ -188,11 +188,14 @@
         return spaceOccupiedByDefaultWriter;
     }
 
-    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment) {
+    private int getSpaceOccupiedBySparseWriter(int maxColumnsInPageZerothSegment, int bufferCapacity) {
         int presentColumns = presentColumnsIndexes.cardinality();
-        int numberOfPagesRequired = (int) Math.ceil(
-                (double) (presentColumns - maxColumnsInPageZerothSegment) / IColumnPageZeroWriter.MIN_COLUMN_SPACE);
-        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfPagesRequired);
+        int maximumNumberOfColumnsInASegment =
+                SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
+        int numberOfExtraPagesRequired = presentColumns <= maxColumnsInPageZerothSegment ? 0
+                : (int) Math.ceil(
+                        (double) (presentColumns - maxColumnsInPageZerothSegment) / maximumNumberOfColumnsInASegment);
+        int headerSpace = SparseColumnMultiPageZeroWriter.getHeaderSpace(numberOfExtraPagesRequired);
         presentColumns = Math.min(presentColumns, maxColumnsInPageZerothSegment);
 
         // space occupied by the sparse writer
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
index 831e459..5deead4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/PageZeroWriterFlavorSelector.java
@@ -50,7 +50,7 @@
  * space requirements of each approach.
  */
 public class PageZeroWriterFlavorSelector implements IColumnPageZeroWriterFlavorSelector {
-    protected byte writerFlag = IColumnPageZeroWriter.ColumnPageZeroWriterType.ADAPTIVE.getWriterFlag();
+    protected byte writerFlag = IColumnPageZeroWriter.ColumnPageZeroWriterType.DEFAULT.getWriterFlag();
 
     // Cache of writer instances to avoid repeated object creation
     private final Byte2ObjectArrayMap<IColumnPageZeroWriter> writers;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
index a34d8c1..d756a6c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/DefaultColumnPageZeroReader.java
@@ -30,6 +30,7 @@
 import java.util.BitSet;
 
 import org.apache.asterix.column.zero.writers.DefaultColumnPageZeroWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
 import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
@@ -43,12 +44,11 @@
     protected static Logger LOGGER = LogManager.getLogger();
 
     protected ByteBuffer pageZeroBuf;
-    protected BitSet pageZeroSegmentsPages;
+    protected static final BitSet EMPTY_SEGMENTS = new BitSet();
     protected int numberOfPresentColumns;
     protected int headerSize;
 
     public DefaultColumnPageZeroReader() {
-        this.pageZeroSegmentsPages = new BitSet();
     }
 
     @Override
@@ -160,18 +160,19 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < offsetColumnIndexPairs.length; i++) {
+        for (int i = 0; i < numberOfPresentColumns; i++) {
             int offset = pageZeroBuf.getInt(columnOffsetStart);
             offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
             columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
+        return numberOfPresentColumns;
     }
 
     @Override
     public BitSet getPageZeroSegmentsPages() {
-        return pageZeroSegmentsPages;
+        return EMPTY_SEGMENTS;
     }
 
     @Override
@@ -186,9 +187,12 @@
 
     @Override
     public BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll) {
-        pageZeroSegmentsPages.clear();
-        pageZeroSegmentsPages.set(0);
-        return pageZeroSegmentsPages;
+        return EMPTY_SEGMENTS;
+    }
+
+    @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        // No-OP
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
index 3b4fdc4..5955d5e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/readers/SparseColumnPageZeroReader.java
@@ -42,6 +42,12 @@
     }
 
     @Override
+    public void reset(ByteBuffer pageZeroBuf, int numberOfPresentColumns, int headerSize) {
+        super.reset(pageZeroBuf, numberOfPresentColumns, headerSize);
+        columnIndexToRelativeColumnIndex.clear();
+    }
+
+    @Override
     public int getColumnOffset(int columnIndex) {
         int relativeColumnIndex = getRelativeColumnIndex(columnIndex);
         return pageZeroBuf.getInt(
@@ -137,7 +143,7 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnIndex = getColumnIndex(0);
         for (int i = 0; i < numberOfPresentColumns; i++) {
             int column = pageZeroBuf.getInt(columnIndex);
@@ -145,5 +151,6 @@
             offsetColumnIndexPairs[i] = IntPairUtil.of(offset, column);
             columnIndex += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
+        return numberOfPresentColumns;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
index 5a0b180..d4ffbb4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroReader.java
@@ -60,8 +60,8 @@
         super();
         zerothSegmentReader = new DefaultColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
-        this.maxNumberOfColumnsInAPage = bufferCapacity
-                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.maxNumberOfColumnsInAPage =
+                DefaultColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
         this.offsetPointable = new VoidPointable();
     }
 
@@ -230,19 +230,24 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < Math.min(offsetColumnIndexPairs.length, zerothSegmentMaxColumns); i++) {
+        int numberOfColumns = getNumberOfPresentColumns();
+        int currentColumnIndex = 0;
+        while (currentColumnIndex < Math.min(numberOfColumns, zerothSegmentMaxColumns)) {
             // search in the 0th segment
             int offset = pageZeroBuf.getInt(columnOffsetStart);
-            offsetColumnIndexPairs[i] = IntPairUtil.of(offset, i);
+            offsetColumnIndexPairs[currentColumnIndex] = IntPairUtil.of(offset, currentColumnIndex);
             columnOffsetStart += DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
+            currentColumnIndex++;
         }
 
-        if (offsetColumnIndexPairs.length > zerothSegmentMaxColumns) {
+        if (numberOfColumns > zerothSegmentMaxColumns) {
             // read the rest of the columns from the segment stream
-            segmentBuffers.readOffset(offsetColumnIndexPairs, zerothSegmentMaxColumns, maxNumberOfColumnsInAPage);
+            currentColumnIndex = segmentBuffers.readOffset(offsetColumnIndexPairs, zerothSegmentMaxColumns,
+                    maxNumberOfColumnsInAPage, currentColumnIndex);
         }
+        return currentColumnIndex;
     }
 
     @Override
@@ -255,7 +260,8 @@
         } else {
             // Iterate over the projected columns and mark the segments that contain them
             int currentIndex = projectedColumns.nextSetBit(zerothSegmentMaxColumns);
-            while (currentIndex >= 0) {
+            int totalNumberOfColumns = getNumberOfPresentColumns();
+            while (currentIndex >= 0 && currentIndex < totalNumberOfColumns) {
                 int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
 
                 int fromSegmentIndex = (currentIndex - zerothSegmentMaxColumns) / maxNumberOfColumnsInAPage + 1;
@@ -273,6 +279,11 @@
     }
 
     @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        segmentBuffers.unPinNotRequiredSegments(pageZeroSegmentsPages, numberOfPageZeroSegments);
+    }
+
+    @Override
     public int getHeaderSize() {
         return EXTENDED_HEADER_SIZE;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
index 7f16d5e..5c7d383 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/DefaultColumnMultiPageZeroWriter.java
@@ -95,8 +95,7 @@
         segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef); // should this be populated at reset?
         this.zerothSegmentWriter = new DefaultColumnPageZeroWriter();
         this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
-        this.maximumNumberOfColumnsInAPage = bufferCapacity
-                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+        this.maximumNumberOfColumnsInAPage = getMaximumNumberOfColumnsInAPage(bufferCapacity);
     }
 
     @Override
@@ -263,4 +262,9 @@
     public int getHeaderSize() {
         return EXTENDED_HEADER_SIZE;
     }
+
+    public static int getMaximumNumberOfColumnsInAPage(int bufferCapacity) {
+        return bufferCapacity
+                / (DefaultColumnPageZeroWriter.COLUMN_OFFSET_SIZE + DefaultColumnPageZeroWriter.FILTER_SIZE);
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
index 0a0a817..035db5d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroReader.java
@@ -64,8 +64,8 @@
         super();
         zerothSegmentReader = new SparseColumnPageZeroReader();
         this.pageZeroSegmentsPages = new BitSet();
-        this.maxNumberOfColumnsInAPage = bufferCapacity
-                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+        this.maxNumberOfColumnsInAPage =
+                SparseColumnMultiPageZeroWriter.getMaximumNumberOfColumnsInAPage(bufferCapacity);
         this.offsetPointable = new VoidPointable();
         this.columnIndexToRelativeColumnIndex = new Int2IntOpenHashMap();
         columnIndexToRelativeColumnIndex.defaultReturnValue(-1);
@@ -118,28 +118,24 @@
         // This method finds the segment index (except for 0th segment) for the given columnIndex.
         if (numberOfPageZeroSegments == 1) {
             // only zeroth segment is present
-            return -1;
+            return 0;
         }
         // gives 0 based segment index (0 for zeroth segment, 1 for first segment, etc.)
-        if (columnIndex <= maxColumnIndexInZerothSegment) {
-            return 0;
-        } else {
-            int start = 0;
-            int end = numberOfPageZeroSegments - 1;
-            int resultSegment = -1;
-            while (start <= end) {
-                int mid = (start + end) / 2;
-                int segmentColumnIndex =
-                        pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + mid * Integer.BYTES);
-                if (segmentColumnIndex >= columnIndex) {
-                    resultSegment = mid;
-                    end = mid - 1; // continue searching in the left half
-                } else {
-                    start = mid + 1;
-                }
+        int start = 1;
+        int end = numberOfPageZeroSegments - 1;
+        int resultSegment = -1;
+        while (start <= end) {
+            int mid = (start + end) / 2;
+            int segmentColumnIndex =
+                    pageZeroBuf.getInt(MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + mid * Integer.BYTES);
+            if (segmentColumnIndex >= columnIndex) {
+                resultSegment = mid;
+                end = mid - 1; // continue searching in the left half
+            } else {
+                start = mid + 1;
             }
-            return resultSegment;
         }
+        return resultSegment;
     }
 
     private int findRelativeColumnIndex(int columnIndex) throws HyracksDataException {
@@ -150,7 +146,7 @@
             return zerothSegmentReader.getRelativeColumnIndex(columnIndex);
         } else {
             int segmentIndex = findSegment(columnIndex);
-            if (segmentIndex == -1) {
+            if (segmentIndex <= 0) {
                 return -1;
             }
             segmentIndex -= 1; // Adjusting to get the segment index for the segment stream
@@ -303,23 +299,30 @@
     }
 
     @Override
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
-        // OffsetColumnIndexPairs is of size getNumberOfPresentColumns() + 1
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        // offsetColumnIndexPairs >= getNumberOfPresentColumns() + 1 (maybe because of the previous MegaLeaf).
+        // Do not rely on offsetColumnIndexPairs.length, as it may be larger than the number of present columns.
+        // This is because the same array is reused for multiple leaf segments, and previous leaves may have more columns.
         int columnOffsetStart = headerSize;
-        for (int i = 0; i < Math.min(offsetColumnIndexPairs.length - 1, numberOfColumnInZerothSegment); i++) {
+        int currentColumnIndex = 0;
+        int numberOfColumns = getNumberOfPresentColumns();
+        while (currentColumnIndex < Math.min(numberOfColumns, numberOfColumnInZerothSegment)) {
             int columnIndex = pageZeroBuf.getInt(columnOffsetStart);
             int columnOffset = pageZeroBuf.getInt(columnOffsetStart + SparseColumnPageZeroWriter.COLUMN_INDEX_SIZE);
-            offsetColumnIndexPairs[i] = IntPairUtil.of(columnOffset, columnIndex);
+            offsetColumnIndexPairs[currentColumnIndex++] = IntPairUtil.of(columnOffset, columnIndex);
             columnOffsetStart += SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE;
         }
 
-        if (offsetColumnIndexPairs.length - 1 > numberOfColumnInZerothSegment) {
+        // If the pages are not pinned, we will not read any columnIndex, but the old stuffs will already be present in the offsetColumnIndexPairs.
+        if (numberOfColumns > numberOfColumnInZerothSegment) {
             // read the rest of the columns from the segment stream
             int columnsInLastSegment = getNumberOfPresentColumns() - numberOfColumnInZerothSegment
                     - (numberOfPageZeroSegments - 2) * maxNumberOfColumnsInAPage;
-            segmentBuffers.readSparseOffset(offsetColumnIndexPairs, numberOfPageZeroSegments, maxNumberOfColumnsInAPage,
-                    columnsInLastSegment);
+            currentColumnIndex = segmentBuffers.readSparseOffset(offsetColumnIndexPairs, numberOfPageZeroSegments,
+                    maxNumberOfColumnsInAPage, columnsInLastSegment, currentColumnIndex);
         }
+
+        return currentColumnIndex;
     }
 
     @Override
@@ -350,7 +353,19 @@
             while (currentIndex >= 0) {
                 int rangeEnd = projectedColumns.nextClearBit(currentIndex); // exclusive
                 int startSegmentIndex = findSegment(currentIndex);
+                if (startSegmentIndex == -1) {
+                    //This indicates that the currentIndex > MaxColumnIndex in the last segment
+                    //Hence this leaf doesn't need to pin the segment for requested column ranges.
+
+                    //We can return early as next projectedColumns next set bit will also be out of bounds.
+                    break;
+                }
                 int endSegmentIndex = findSegment(rangeEnd - 1);
+                if (endSegmentIndex == -1) {
+                    //This indicates that the rangeEnd - 1 > MaxColumnIndex in the last segment
+                    //but the startSegmentIndex is valid, hence we may pin to the last segment.
+                    endSegmentIndex = numberOfPageZeroSegments - 1; // Last segment index
+                }
 
                 if (startSegmentIndex <= endSegmentIndex) {
                     pageZeroSegmentsPages.set(startSegmentIndex, endSegmentIndex + 1);
@@ -363,6 +378,11 @@
     }
 
     @Override
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        segmentBuffers.unPinNotRequiredSegments(pageZeroSegmentsPages, numberOfPageZeroSegments);
+    }
+
+    @Override
     public void printPageZeroReaderInfo() {
         ColumnarValueException ex = new ColumnarValueException();
         ObjectNode readerNode = ex.createNode(getClass().getSimpleName());
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
index 695ee6e..5753632 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/zero/writers/multipage/SparseColumnMultiPageZeroWriter.java
@@ -96,8 +96,7 @@
         segments = new MultiPersistentPageZeroBufferBytesOutputStream(multiPageOpRef);
         this.zerothSegmentMaxColumns = zerothSegmentMaxColumns;
         this.zerothSegmentWriter = new SparseColumnPageZeroWriter();
-        this.maximumNumberOfColumnsInAPage = bufferCachePageSize
-                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
+        this.maximumNumberOfColumnsInAPage = getMaximumNumberOfColumnsInAPage(bufferCachePageSize);
     }
 
     @Override
@@ -282,7 +281,12 @@
         return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
     }
 
-    public static int getHeaderSpace(int numberOfPageZeroSegments) {
-        return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfPageZeroSegments * Integer.BYTES;
+    public static int getHeaderSpace(int numberOfExtraPagesRequired) {
+        return MAX_COLUMNS_INDEX_IN_ZEROTH_SEGMENT_OFFSET + numberOfExtraPagesRequired * Integer.BYTES;
+    }
+
+    public static int getMaximumNumberOfColumnsInAPage(int bufferCachePageSize) {
+        return bufferCachePageSize
+                / (SparseColumnPageZeroWriter.COLUMN_OFFSET_SIZE + SparseColumnPageZeroWriter.FILTER_SIZE);
     }
 }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 6e85d29..1ef865e 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -202,8 +202,10 @@
         }
         //Reserved for the number of pages
         int requiredFreeSpace = HEADER_SIZE;
+        //Since this test uses DefaultWriter, it does not need the bufferCapacity in the calculation
+        int bufferCapacity = Integer.MAX_VALUE;
         //Columns' Offsets
-        requiredFreeSpace += columnWriter.getPageZeroWriterOccupiedSpace(100, true,
+        requiredFreeSpace += columnWriter.getPageZeroWriterOccupiedSpace(100, bufferCapacity, true,
                 IColumnPageZeroWriter.ColumnPageZeroWriterType.DEFAULT);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index f5cef05..8b3abd6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -64,7 +64,7 @@
      *
      * @return the size needed to store columns' offsets
      */
-    public abstract int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment,
+    public abstract int getPageZeroWriterOccupiedSpace(int maxColumnsInPageZerothSegment, int bufferCapacity,
             boolean includeCurrentTupleColumns, IColumnPageZeroWriter.ColumnPageZeroWriterType adaptive);
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
index 6cf43bf..87b35bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -107,65 +107,69 @@
      */
     public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet requestedColumns, BitSet evictableColumns,
             BitSet cloudOnlyColumns) throws HyracksDataException {
-        // Set leafFrame
-        this.leafFrame = leafFrame;
-        // Ensure arrays capacities (given the leafFrame's columns and pages)
-        init();
+        try {
+            // Set leafFrame
+            this.leafFrame = leafFrame;
+            // Ensure arrays capacities (given the leafFrame's columns and pages)
+            init();
 
-        // Get the number of columns in a page
-        int numberOfColumns = leafFrame.getNumberOfColumns();
-        // Set the first 32-bits to the offset and the second 32-bits to columnIndex
-        leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+            // Set the first 32-bits to the offset and the second 32-bits to columnIndex
+            int numberOfPresentColumnsInLeaf = leafFrame.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
 
-        // Set artificial offset to determine the last column's length
-        int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
-        offsetColumnIndexPairs[numberOfColumns] = IntPairUtil.of(megaLeafLength, numberOfColumns);
+            // Set artificial offset to determine the last column's length
+            int megaLeafLength = leafFrame.getMegaLeafNodeLengthInBytes();
+            offsetColumnIndexPairs[numberOfPresentColumnsInLeaf] =
+                    IntPairUtil.of(megaLeafLength, numberOfPresentColumnsInLeaf);
 
-        // Sort the pairs by offset (i.e., lowest offset first)
-        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, OFFSET_COMPARATOR);
+            // Sort the pairs by offset (i.e., lowest offset first)
+            LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfPresentColumnsInLeaf, OFFSET_COMPARATOR);
 
-        int columnOrdinal = 0;
-        for (int i = 0; i < numberOfColumns; i++) {
-            if (offsetColumnIndexPairs[i] == 0) {
-                //Any requested column's offset can't be zero
-                //In case a column is not being present in the accessed pageZero segments, it will be defaulted to 0
-                continue;
-            }
-            int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
-            int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
-            int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+            int columnOrdinal = 0;
+            for (int i = 0; i < numberOfPresentColumnsInLeaf; i++) {
+                if (offsetColumnIndexPairs[i] == 0) {
+                    //Any requested column's offset can't be zero
+                    //In case a column is not being present in the accessed pageZero segments, it will be defaulted to 0
+                    continue;
+                }
+                int columnIndex = getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+                int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+                int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
 
-            // Compute the column's length in bytes (set 0 for PKs)
-            int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
-            // In case of sparse columns, few columnIndexes can be greater than the total sparse column count.
-            ensureCapacity(columnIndex);
-            lengths[columnIndex] = length;
+                // Compute the column's length in bytes (set 0 for PKs)
+                int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - offset;
+                // In case of sparse columns, few columnIndexes can be greater than the total sparse column count.
+                ensureCapacity(columnIndex);
+                lengths[columnIndex] = length;
 
-            // Get start page ID (given the computed length above)
-            int startPageId = getColumnStartPageIndex(columnIndex);
-            // Get the number of pages (given the computed length above)
-            int numberOfPages = getColumnNumberOfPages(columnIndex);
+                // Get start page ID (given the computed length above)
+                int startPageId = getColumnStartPageIndex(columnIndex);
+                // Get the number of pages (given the computed length above)
+                int numberOfPages = getColumnNumberOfPages(columnIndex);
 
-            if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
-                // Set column index
-                columnsOrder[columnOrdinal++] = columnIndex;
-                // Compute cloud-only and evictable pages
-                setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
-                        numberOfPages);
-                // A requested column. Keep its pages as requested
-                continue;
+                if (columnIndex >= numberOfPrimaryKeys && requestedColumns.get(columnIndex)) {
+                    // Set column index
+                    columnsOrder[columnOrdinal++] = columnIndex;
+                    // Compute cloud-only and evictable pages
+                    setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, evictableColumns, startPageId,
+                            numberOfPages);
+                    // A requested column. Keep its pages as requested
+                    continue;
+                }
+
+                // Mark the page as non-evictable
+                for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                    nonEvictablePages.set(j);
+                }
             }
 
-            // Mark the page as non-evictable
-            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
-                nonEvictablePages.set(j);
-            }
+            // Bound the nonRequestedPages to the number of pages in the mega leaf node
+            nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+            // to indicate the end
+            columnsOrder[columnOrdinal] = -1;
+        } finally {
+            //Unpin the not required segment pages
+            leafFrame.unPinNotRequiredPageZeroSegments();
         }
-
-        // Bound the nonRequestedPages to the number of pages in the mega leaf node
-        nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
-        // to indicate the end
-        columnsOrder[columnOrdinal] = -1;
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 2cefab3..181aa06 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -138,13 +138,17 @@
         }
 
         // pin the required page segments
-        mergedPageRanges.clear();
+        //        mergedPageRanges.clear();
         int pageZeroId = leafFrame.getPageId();
+        // Pinning all the segments of the page zero
+        // as the column eviction logic is based on the length of the columns which
+        // gets evaluated from the page zero segments.
         BitSet pageZeroSegmentRanges =
                 leafFrame.markRequiredPageZeroSegments(projectedColumns, pageZeroId, operation == MERGE);
-        // Merge the page zero segments ranges
-        mergePageZeroSegmentRanges(pageZeroSegmentRanges);
-        mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
+        // will unpin the non-required segments after columnRanges.reset()
+        // can we do lazily?
+        int numberOfPageZeroSegments = leafFrame.getNumberOfPageZeroSegments();
+        pinAll(fileId, pageZeroId, numberOfPageZeroSegments - 1, bufferCache);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 8439373..9ca57dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -133,8 +133,8 @@
         }
         //Columns' Offsets
         columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        int requiredFreeSpace =
-                columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment, true, pageZeroWriterType);
+        int requiredFreeSpace = columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment,
+                columnarFrame.getBuffer().capacity(), true, pageZeroWriterType);
         //Occupied space from previous writes
         requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
         //min and max tuples' sizes
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
index c3f8228..3307423 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeReadLeafFrame.java
@@ -119,6 +119,7 @@
     public int getColumnOffset(int columnIndex) throws HyracksDataException {
         // update the exception message.
         if (!columnPageZeroReader.isValidColumn(columnIndex)) {
+            printPageZeroReaderInfo();
             throw new IndexOutOfBoundsException(columnIndex + " >= " + getNumberOfColumns());
         }
         return columnPageZeroReader.getColumnOffset(columnIndex);
@@ -177,8 +178,8 @@
         throw new IllegalArgumentException("Use createTupleReference(int)");
     }
 
-    public void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
-        columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
+    public int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs) {
+        return columnPageZeroReader.populateOffsetColumnIndexPairs(offsetColumnIndexPairs);
     }
 
     public BitSet getPageZeroSegmentsPages() {
@@ -189,6 +190,10 @@
         return columnPageZeroReader.markRequiredPageSegments(projectedColumns, pageZeroId, markAll);
     }
 
+    public void unPinNotRequiredPageZeroSegments() throws HyracksDataException {
+        columnPageZeroReader.unPinNotRequiredPageZeroSegments();
+    }
+
     public void printPageZeroReaderInfo() {
         columnPageZeroReader.printPageZeroReaderInfo();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
index d9cf11b..f57eba3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/IColumnPageZeroReader.java
@@ -64,7 +64,7 @@
 
     ByteBuffer getPageZeroBuf();
 
-    void populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
+    int populateOffsetColumnIndexPairs(long[] offsetColumnIndexPairs);
 
     int getNumberOfPageZeroSegments();
 
@@ -77,4 +77,6 @@
     BitSet markRequiredPageSegments(BitSet projectedColumns, int pageZeroId, boolean markAll);
 
     void printPageZeroReaderInfo();
+
+    void unPinNotRequiredPageZeroSegments() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
index 1556bea..c60ca79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiPageZeroBufferProvider.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.longs.LongSet;
 
 public class ColumnMultiPageZeroBufferProvider implements IColumnBufferProvider {
@@ -75,14 +76,13 @@
             // will do on request basis? or prefetch all the segments?
             return;
         }
-        // traverse the set segments and read the pages
-        int currentIndex = pageZeroSegmentsPages.nextSetBit(0);
-        while (currentIndex != -1) {
-            int segmentIndex = currentIndex - 1; // segmentIndex starts from 1
+        //Since all the pageSegments are pinned for calculating the lengths of the columns,
+        //read all the segments and store them in the buffers list.
+        //after ColumnRanges.reset(), unpin the segments that are not required.
+        for (int segmentIndex = 0; segmentIndex < numberOfRemainingPages; segmentIndex++) {
             ByteBuffer buffer = read(segmentIndex);
             segmentDir.put(segmentIndex, buffers.size());
             buffers.add(buffer);
-            currentIndex = pageZeroSegmentsPages.nextSetBit(currentIndex + 1);
         }
     }
 
@@ -97,11 +97,41 @@
     @Override
     public void releaseAll() throws HyracksDataException {
         for (ICachedPage page : pages) {
-            multiPageOp.unpin(page);
+            if (page != null) {
+                multiPageOp.unpin(page);
+            }
         }
         pages.clear();
     }
 
+    public void releasePages(IntList notRequiredSegmentsIndexes) throws HyracksDataException {
+        //From the list of cached pages, remove those pages.
+        //Pages and buffers list are in sync, so we can use the same indexes.
+        Throwable th = null;
+        for (int pageIndex : notRequiredSegmentsIndexes) {
+            if (pageIndex < 0 || pageIndex >= pages.size()) {
+                throw new IndexOutOfBoundsException("Page index out of bounds: " + pageIndex);
+            }
+            try {
+                ICachedPage page = pages.get(pageIndex);
+                if (page != null) {
+                    multiPageOp.unpin(page);
+                    pinnedPages.remove(((CachedPage) page).getDiskPageId());
+                    pages.set(pageIndex, null); // Clear the reference
+                }
+            } catch (Exception e) {
+                if (th == null) {
+                    th = e;
+                } else {
+                    th.addSuppressed(e);
+                }
+            }
+        }
+        if (th != null) {
+            throw HyracksDataException.create(th);
+        }
+    }
+
     @Override
     public ByteBuffer getBuffer() {
         throw new UnsupportedOperationException("getBuffer() is not supported for multi-page zero buffer provider.");
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index de4c4a9..66dbcf5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -131,6 +131,11 @@
                 break;
             }
         }
+
+        if (matchingTupleCount == 0 && retainInput && retainMissing) {
+            int end = accessor.getTupleCount();
+            appendMissingTuple(0, end);
+        }
         stats.getInputTupleCounter().update(matchingTupleCount);
     }