Merge "merge branch gerrit/trinity into gerrit/ionic" into ionic
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index a101bed..bd9e329 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -120,6 +121,7 @@
         // Performs prefix pushdowns
         pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor.add(new ParquetFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor
                 .add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
         // Inlines AND/OR expression (must be last to run)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index 1622c6e..c4210a3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -26,7 +26,9 @@
 import java.util.Set;
 
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor;
@@ -84,8 +86,15 @@
 
     public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
             LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
-        ScanDefineDescriptor scanDefDesc =
-                new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
+        ScanDefineDescriptor scanDefDesc;
+        if (DatasetUtil.isParquetFormat(dataset)) {
+            scanDefDesc = new ParquetDatasetScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable,
+                    metaVariable, scanOperator);
+        } else {
+            scanDefDesc = new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable,
+                    scanOperator);
+        }
+        new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
         defineChain.put(recordVariable, scanDefDesc);
         useChain.put(recordVariable, new ArrayList<>());
         if (metaVariable != null) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 023e4da..cf438d8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -31,12 +31,14 @@
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
 import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -121,6 +123,12 @@
         Map<String, String> configuration = ((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties();
         boolean embedFilterValues = ExternalDataPrefix.containsComputedFields(configuration) && Boolean.parseBoolean(
                 configuration.getOrDefault(ExternalDataConstants.KEY_EMBED_FILTER_VALUES, ExternalDataConstants.TRUE));
+        if (DatasetUtil.isParquetFormat(dataset)) {
+            return new ParquetExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
+                    scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(),
+                    ((ParquetDatasetScanDefineDescriptor) scanDefineDescriptor).getRowGroupFilterExpression(),
+                    embedFilterValues);
+        }
         return new ExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
                 scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(), embedFilterValues);
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
new file mode 100644
index 0000000..4b4d742
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.List;
+
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class ParquetDatasetScanDefineDescriptor extends ScanDefineDescriptor {
+
+    private ILogicalExpression rowGroupFilterExpression;
+
+    public ParquetDatasetScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
+            LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
+        super(scope, dataset, primaryKeyVariables, recordVariable, metaRecordVariable, operator);
+        this.rowGroupFilterExpression = null;
+    }
+
+    public ILogicalExpression getRowGroupFilterExpression() {
+        return rowGroupFilterExpression;
+    }
+
+    public void setRowGroupFilterExpression(ILogicalExpression rowGroupFilterExpression) {
+        this.rowGroupFilterExpression = rowGroupFilterExpression;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 1ec64d5..0d79767 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -61,7 +61,7 @@
     protected final ExpressionToExpectedSchemaNodeVisitor exprToNodeVisitor;
     protected final ColumnFilterPathBuilderVisitor pathBuilderVisitor;
     protected final Map<ILogicalExpression, ARecordType> paths;
-    private final ArrayPathCheckerVisitor checkerVisitor;
+    protected final ArrayPathCheckerVisitor checkerVisitor;
 
     public ColumnFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
         super(pushdownContext, context);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
new file mode 100644
index 0000000..25db9f9
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.RANGE_FILTER_PUSHABLE_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ParquetFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+
+    public ParquetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+        return !DatasetUtil.isParquetFormat(scanDefineDescriptor.getDataset());
+    }
+
+    @Override
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier());
+    }
+
+    @Override
+    protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+        IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+        if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+            return false;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        paths.put(expression, expressionPath);
+        return true;
+    }
+
+    @Override
+    protected void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor, ILogicalExpression inlinedExpr)
+            throws AlgebricksException {
+        if (checkerVisitor.containsMultipleArrayPaths(paths.values())) {
+            // Cannot pushdown a filter with multiple unnest
+            // TODO allow rewindable column readers for filters
+            // TODO this is a bit conservative (maybe too conservative) as we can push part of expression down
+            return;
+        }
+        ParquetDatasetScanDefineDescriptor scanDefDesc = (ParquetDatasetScanDefineDescriptor) scanDefineDescriptor;
+        ILogicalExpression filterExpr = scanDefDesc.getRowGroupFilterExpression();
+        if (filterExpr != null) {
+            filterExpr = andExpression(filterExpr, inlinedExpr);
+            scanDefDesc.setRowGroupFilterExpression(filterExpr);
+        } else {
+            scanDefDesc.setRowGroupFilterExpression(inlinedExpr);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index c801283..f74a995 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -490,7 +490,7 @@
       <id>asterix-gerrit-asterix-app</id>
       <properties>
         <test.excludes>
-          **/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
+          **/CloudStorageTest.java,**/CloudStorageCancellationTest.java,**/CloudStorageGCSTest.java,**/CloudStorageUnstableTest.java,
           **/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,
           **/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,
           **/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,
@@ -611,6 +611,7 @@
       <properties>
         <test.includes>
           **/CloudStorageTest.java,
+          **/CloudStorageCancellationTest.java,
           **/SqlppSinglePointLookupExecutionTest.java, **/AwsS3*.java
         </test.includes>
         <failIfNoTests>false</failIfNoTests>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
new file mode 100644
index 0000000..2e1f94f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageCancellationTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.DELTA_RESULT_PATH;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.EXCLUDED_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.ONLY_TESTS;
+import static org.apache.asterix.test.cloud_storage.CloudStorageTest.SUITE_TESTS;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.api.common.LocalCloudUtilAdobeMock;
+import org.apache.asterix.test.common.CancellationTestExecutor;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageCancellationTest {
+
+    private final TestCaseContext tcCtx;
+
+    public CloudStorageCancellationTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestExecutor testExecutor = new CancellationTestExecutor(DELTA_RESULT_PATH);
+        CloudStorageTest.setupEnv(testExecutor);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+        LocalCloudUtilAdobeMock.shutdownSilently();
+    }
+
+    @Parameters(name = "CloudStorageCancellationTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index 498f060..3405838 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -57,16 +57,16 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final TestCaseContext tcCtx;
-    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
-    private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
-    private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
-    private static final String DELTA_RESULT_PATH = "results_cloud";
-    private static final String EXCLUDED_TESTS = "MP";
+    public static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    public static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    public static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf";
+    public static final String DELTA_RESULT_PATH = "results_cloud";
+    public static final String EXCLUDED_TESTS = "MP";
 
-    private static final String PLAYGROUND_CONTAINER = "playground";
-    private static final String MOCK_SERVER_REGION = "us-west-2";
-    private static final int MOCK_SERVER_PORT = 8001;
-    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+    public static final String PLAYGROUND_CONTAINER = "playground";
+    public static final String MOCK_SERVER_REGION = "us-west-2";
+    public static final int MOCK_SERVER_PORT = 8001;
+    public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
 
     public CloudStorageTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
@@ -74,8 +74,12 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
         TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        setupEnv(testExecutor);
+    }
+
+    public static void setupEnv(TestExecutor testExecutor) throws Exception {
+        LocalCloudUtilAdobeMock.startS3CloudEnvironment(true);
         testExecutor.executorId = "cloud";
         testExecutor.stripSubstring = "//DB:";
         LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index b4b7352..dac71ce 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -49,6 +49,14 @@
 
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
 
+    public CancellationTestExecutor() {
+        super();
+    }
+
+    public CancellationTestExecutor(String deltaPath) {
+        super(deltaPath);
+    }
+
     @Override
     public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
             List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, Charset responseCharset,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index c6d7ba9..29061ec 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -17,6 +17,7 @@
     "cloud.max.read.requests.per.second" : 1500,
     "cloud.max.write.requests.per.second" : 250,
     "cloud.profiler.log.interval" : 5,
+    "cloud.requests.max.http.connections" : 1000,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 562e195..f2ea15b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -17,6 +17,7 @@
     "cloud.max.read.requests.per.second" : 1500,
     "cloud.max.write.requests.per.second" : 250,
     "cloud.profiler.log.interval" : 5,
+    "cloud.requests.max.http.connections" : 1000,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 132fa5b..685d28b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -17,6 +17,7 @@
     "cloud.max.read.requests.per.second" : 1500,
     "cloud.max.write.requests.per.second" : 250,
     "cloud.profiler.log.interval" : 5,
+    "cloud.requests.max.http.connections" : 1000,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
index b560a5b..805b9aa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
index 2bfad06..a57424c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
index afb74f1..5e23176 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department embed-filter-value: true row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
index fc48056..5832f4f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
index 8dcaa95..ecbd5ee 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
index a99cc0c..35da948 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
index a7c5727..23892f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
index b48def6..9fb8ee1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
index 2b41307..9dab9c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
@@ -14,7 +14,7 @@
             -- STREAM_SELECT  |PARTITIONED|
               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                data-scan []<-[$$p1] <- test.ParquetDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                data-scan []<-[$$p1] <- test.ParquetDataset1 row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- DATASOURCE_SCAN  |PARTITIONED|
                   exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
index 3993a4d..2fca35d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
@@ -36,7 +36,7 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                             -- DATASOURCE_SCAN  |PARTITIONED|
                               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
index 9b93e19..945f2da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
@@ -36,7 +36,7 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                             -- DATASOURCE_SCAN  |PARTITIONED|
                               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
index cccf9a3..6941720 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
@@ -44,7 +44,7 @@
                     -- ASSIGN  |PARTITIONED|
                       exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("x"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                         -- DATASOURCE_SCAN  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
index cf265bf..316e465 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
@@ -44,7 +44,7 @@
                     -- ASSIGN  |PARTITIONED|
                       exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                         -- DATASOURCE_SCAN  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 6f952b0..419ac4d 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -247,6 +247,10 @@
             <artifactId>aws-crt</artifactId>
         </dependency>
         <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 0b9b15c..20727de 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -41,15 +41,16 @@
     private final long tokenAcquireTimeout;
     private final int readMaxRequestsPerSeconds;
     private final int writeMaxRequestsPerSeconds;
+    private final int requestsMaxHttpConnections;
 
     public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize) {
-        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0);
+        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0);
     }
 
     private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
-            int readMaxRequestsPerSeconds) {
+            int readMaxRequestsPerSeconds, int requestsMaxHttpConnections) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -59,6 +60,7 @@
         this.tokenAcquireTimeout = tokenAcquireTimeout;
         this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
         this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
+        this.requestsMaxHttpConnections = requestsMaxHttpConnections;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -66,7 +68,7 @@
                 cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
                 cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(),
                 cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
-                cloudProperties.getReadMaxRequestsPerSecond());
+                cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getRequestsMaxHttpConnections());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -124,6 +126,10 @@
         return readMaxRequestsPerSeconds;
     }
 
+    public int getRequestsMaxHttpConnections() {
+        return requestsMaxHttpConnections;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 224ede4..01a8b02 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -63,6 +63,7 @@
 
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
@@ -325,6 +326,10 @@
         S3ClientBuilder builder = S3Client.builder();
         builder.credentialsProvider(config.createCredentialsProvider());
         builder.region(Region.of(config.getRegion()));
+        if (config.getRequestsMaxHttpConnections() > 0) {
+            builder.httpClientBuilder(
+                    ApacheHttpClient.builder().maxConnections(config.getRequestsMaxHttpConnections()));
+        }
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             URI uri;
             try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 7b6fb6e..1e3fe75 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -64,7 +64,8 @@
         CLOUD_WRITE_BUFFER_SIZE(
                 getRangedIntegerType(5, Integer.MAX_VALUE),
                 StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE)),
-        CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50);
+        CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50),
+        CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER, 1000);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -96,6 +97,7 @@
                 case CLOUD_MAX_READ_REQUESTS_PER_SECOND:
                 case CLOUD_WRITE_BUFFER_SIZE:
                 case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
+                case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -163,6 +165,8 @@
                     return "The write buffer size in bytes. (default: 8MB, min: 5MB)";
                 case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
                     return "The number of cloud reads for re-evaluating an eviction plan. (default: 50)";
+                case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
+                    return "The maximum number of HTTP connections to use for cloud requests per node. (default: 1000)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -258,4 +262,8 @@
     public int getEvictionPlanReevaluationThreshold() {
         return accessor.getInt(Option.CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD);
     }
+
+    public int getRequestsMaxHttpConnections() {
+        return accessor.getInt(Option.CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 7f3642b..2803f21 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.api.IIOBlockingOperation;
 import org.apache.asterix.common.transactions.ILogManager;
@@ -47,6 +48,8 @@
     private final int datasetID;
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
+    private final AtomicInteger failedFlushes = new AtomicInteger();
+    private final AtomicInteger failedMerges = new AtomicInteger();
     private int numActiveIOOps;
     private int pendingFlushes;
     private int pendingMerges;
@@ -317,4 +320,25 @@
     public synchronized int getPendingReplications() {
         return pendingReplications;
     }
+
+    public int getFailedFlushes() {
+        return failedFlushes.get();
+    }
+
+    public int getFailedMerges() {
+        return failedMerges.get();
+    }
+
+    public void incrementFailedIoOp(ILSMIOOperation.LSMIOOperationType operation) {
+        switch (operation) {
+            case FLUSH:
+                failedFlushes.incrementAndGet();
+                break;
+            case MERGE:
+                failedMerges.incrementAndGet();
+                break;
+            default:
+                break;
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index fc27bf6..0128297 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -955,6 +955,8 @@
             stats.addPendingFlushes(dsr.getDatasetInfo().getPendingFlushes());
             stats.addPendingMerges(dsr.getDatasetInfo().getPendingMerges());
             stats.addPendingReplications(dsr.getDatasetInfo().getPendingReplications());
+            stats.addFailedFlushes(dsr.getDatasetInfo().getFailedFlushes());
+            stats.addFailedMerges(dsr.getDatasetInfo().getFailedMerges());
         }
         return stats;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 52af329..6887782 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -143,6 +143,7 @@
 
     @Override
     public void afterFailure(ILSMIOOperation operation) {
+        dsInfo.incrementFailedIoOp(operation.getIOOperationType());
         if (isMerge(operation)) {
             try {
                 ioManager.delete(getOperationMaskFilePath(operation));
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
index b58f63d..2e2a4a7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
@@ -23,6 +23,8 @@
     private int pendingFlushes;
     private int pendingMerges;
     private int pendingReplications;
+    private int failedFlushes;
+    private int failedMerges;
 
     public void addPendingFlushes(int pending) {
         pendingFlushes += pending;
@@ -36,6 +38,22 @@
         pendingReplications += pending;
     }
 
+    public void addFailedFlushes(int failed) {
+        failedFlushes += failed;
+    }
+
+    public void addFailedMerges(int failed) {
+        failedMerges += failed;
+    }
+
+    public int getFailedFlushes() {
+        return failedFlushes;
+    }
+
+    public int getFailedMerges() {
+        return failedMerges;
+    }
+
     public int getPendingFlushes() {
         return pendingFlushes;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
new file mode 100644
index 0000000..774e908
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+
+public class ParquetFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+    private final FilterPredicate filterExpression;
+    private final IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory;
+
+    public ParquetFilterEvaluatorFactory(IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory,
+            FilterPredicate expression) {
+        this.externalFilterEvaluatorFactory = externalFilterEvaluatorFactory;
+        this.filterExpression = expression;
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+            throws HyracksDataException {
+        return externalFilterEvaluatorFactory.create(serviceContext, warningCollector);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+        return externalFilterEvaluatorFactory.createValueEmbedder(warningCollector);
+    }
+
+    public FilterPredicate getFilterExpression() {
+        return filterExpression;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 2d92e10..f5dc7a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -44,6 +45,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.amazonaws.SdkBaseException;
 
@@ -91,6 +94,13 @@
             IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
             configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions);
             configureHdfsConf(conf, configuration);
+            if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+                FilterPredicate parquetFilterPredicate =
+                        ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+                if (parquetFilterPredicate != null) {
+                    ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+                }
+            }
         } catch (SdkException | SdkBaseException ex) {
             throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
         } catch (AlgebricksException ex) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 530ce74..72c9977 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -41,6 +42,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobItem;
@@ -82,6 +85,13 @@
         JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
+        if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+            FilterPredicate parquetFilterPredicate =
+                    ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+            if (parquetFilterPredicate != null) {
+                ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 4dedb08..1db4445 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -40,6 +41,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.models.PathItem;
@@ -69,6 +72,13 @@
         JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
+        if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+            FilterPredicate parquetFilterPredicate =
+                    ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+            if (parquetFilterPredicate != null) {
+                ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 874c3bd..9469747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -39,6 +40,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.google.cloud.storage.Blob;
 
@@ -76,6 +79,13 @@
         int numberOfPartitions = getPartitionConstraint().getLocations().length;
         GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
         configureHdfsConf(conf, configuration);
+        if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+            FilterPredicate parquetFilterPredicate =
+                    ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+            if (parquetFilterPredicate != null) {
+                ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2551b86..0cd150e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -854,4 +854,9 @@
         return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
                 .isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
     }
+
+    public static boolean isParquetFormat(Dataset dataset) {
+        return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
+                .isParquetFormat(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 5939290..dfefb8f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.metadata.utils;
 
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
@@ -43,6 +44,7 @@
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
 import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -53,6 +55,7 @@
 import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.ParquetFilterBuilder;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.types.ARecordType;
@@ -61,6 +64,7 @@
 import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -346,6 +350,25 @@
                         (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
                 return builder.build();
             }
+        } else if (isParquetFormat(properties)) {
+            if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+                return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+            } else {
+                ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+                ExternalDatasetProjectionFiltrationInfo pfi =
+                        (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
+                IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory =
+                        NoOpExternalFilterEvaluatorFactory.INSTANCE;
+                if (!prefix.getPaths().isEmpty()) {
+                    ExternalFilterBuilder externalFilterBuilder =
+                            new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+                    externalFilterEvaluatorFactory = externalFilterBuilder.build();
+                }
+                ParquetFilterBuilder builder = new ParquetFilterBuilder(
+                        (ParquetExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
+                return new ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
+                        builder.buildFilterPredicate());
+            }
         } else {
             if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
                 return NoOpExternalFilterEvaluatorFactory.INSTANCE;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
new file mode 100644
index 0000000..38ad119
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.io.api.Binary;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Predicate;
+
+public class ParquetFilterBuilder extends AbstractFilterBuilder {
+
+    private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+
+    public ParquetFilterBuilder(ParquetExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
+            JobGenContext context, IVariableTypeEnvironment typeEnv) {
+        super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getParquetRowGroupFilterExpression(),
+                context, typeEnv);
+    }
+
+    public FilterPredicate buildFilterPredicate() throws AlgebricksException {
+        FilterPredicate parquetFilterPredicate = null;
+        if (filterExpression != null) {
+            try {
+                parquetFilterPredicate = createFilterExpression(filterExpression);
+            } catch (Exception e) {
+                LOGGER.error("Error creating Parquet row-group filter expression ", e);
+            }
+        }
+        if (parquetFilterPredicate != null && !(parquetFilterPredicate instanceof Predicate)) {
+            parquetFilterPredicate = null;
+        }
+        return parquetFilterPredicate;
+    }
+
+    private FilterPredicate createComparisonExpression(ILogicalExpression columnName, ILogicalExpression constValue,
+            FunctionIdentifier fid) throws AlgebricksException {
+        ConstantExpression constExpr = (ConstantExpression) constValue;
+        if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) {
+            throw new RuntimeException("Unsupported literal type: " + constExpr.getValue());
+        }
+        AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue();
+        String fieldName = createColumnExpression(columnName);
+        switch (constantValue.getObject().getType().getTypeTag()) {
+            case STRING:
+                return createComparisionFunction(FilterApi.binaryColumn(fieldName),
+                        Binary.fromString(((AString) constantValue.getObject()).getStringValue()), fid);
+            case TINYINT:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        (int) ((AInt8) constantValue.getObject()).getByteValue(), fid);
+            case SMALLINT:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        (int) ((AInt16) constantValue.getObject()).getShortValue(), fid);
+            case INTEGER:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        ((AInt32) constantValue.getObject()).getIntegerValue(), fid);
+            case BOOLEAN:
+                if (!fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+                    throw new RuntimeException("Unsupported comparison function: " + fid);
+                }
+                return FilterApi.eq(FilterApi.booleanColumn(fieldName), constantValue.isTrue());
+            case BIGINT:
+                return createComparisionFunction(FilterApi.longColumn(fieldName),
+                        ((AInt64) constantValue.getObject()).getLongValue(), fid);
+            case DOUBLE:
+                return createComparisionFunction(FilterApi.doubleColumn(fieldName),
+                        ((ADouble) constantValue.getObject()).getDoubleValue(), fid);
+            case DATE:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        ((ADate) constantValue.getObject()).getChrononTimeInDays(), fid);
+            case DATETIME:
+                Long millis = ((ADateTime) constantValue.getObject()).getChrononTime();
+                return createComparisionFunction(FilterApi.longColumn(fieldName),
+                        TimeUnit.MILLISECONDS.toMicros(millis), fid);
+            default:
+                throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType());
+        }
+    }
+
+    @Override
+    protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) {
+        return null;
+    }
+
+    private FilterPredicate createFilterExpression(ILogicalExpression expr) throws AlgebricksException {
+        if (expr == null || expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            throw new RuntimeException("Unsupported expression: " + expr);
+        }
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        IFunctionDescriptor fd = resolveFunction(funcExpr);
+        FunctionIdentifier fid = fd.getIdentifier();
+        if (funcExpr.getArguments().size() != 2
+                && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) {
+            throw new RuntimeException("Unsupported function: " + funcExpr);
+        }
+        List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+        if (fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR)) {
+            return createAndOrPredicate(fid, args, 0);
+        } else {
+            return createComparisonExpression(args.get(0).getValue(), args.get(1).getValue(), fid);
+        }
+    }
+
+    private <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsLtGt> FilterPredicate createComparisionFunction(
+            C column, T value, FunctionIdentifier fid) {
+        if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+            return FilterApi.eq(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+            return FilterApi.gtEq(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+            return FilterApi.gt(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+            return FilterApi.ltEq(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+            return FilterApi.lt(column, value);
+        } else {
+            throw new RuntimeException("Unsupported function: " + fid);
+        }
+    }
+
+    protected String createColumnExpression(ILogicalExpression expression) {
+        ARecordType path = filterPaths.get(expression);
+        if (path.getFieldNames().length != 1) {
+            throw new RuntimeException("Unsupported column expression: " + expression);
+        } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+            // The field could be a nested field
+            List<String> fieldList = new ArrayList<>();
+            fieldList = createPathExpression(path, fieldList);
+            return String.join(".", fieldList);
+        } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+            return path.getFieldNames()[0];
+        } else {
+            throw new RuntimeException("Unsupported column expression: " + expression);
+        }
+    }
+
+    private List<String> createPathExpression(ARecordType path, List<String> fieldList) {
+        if (path.getFieldNames().length != 1) {
+            throw new RuntimeException("Error creating column expression");
+        } else {
+            fieldList.add(path.getFieldNames()[0]);
+        }
+        if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+            return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList);
+        } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+            return fieldList;
+        } else {
+            throw new RuntimeException("Error creating column expression");
+        }
+    }
+
+    // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3))
+    private FilterPredicate createAndOrPredicate(FunctionIdentifier function, List<Mutable<ILogicalExpression>> args,
+            int index) throws AlgebricksException {
+        if (index == args.size() - 2) {
+            if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+                return FilterApi.and(createFilterExpression(args.get(0).getValue()),
+                        createFilterExpression(args.get(1).getValue()));
+            } else {
+                return FilterApi.or(createFilterExpression(args.get(0).getValue()),
+                        createFilterExpression(args.get(1).getValue()));
+            }
+        } else {
+            if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+                return FilterApi.and(createFilterExpression(args.get(index).getValue()),
+                        createAndOrPredicate(function, args, index + 1));
+            } else {
+                return FilterApi.or(createFilterExpression(args.get(index).getValue()),
+                        createAndOrPredicate(function, args, index + 1));
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index e0ea99a..f95483e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -43,7 +43,7 @@
     protected final ARecordType projectedType;
     protected final ILogicalExpression filterExpression;
     protected final Map<String, FunctionCallInformation> functionCallInfoMap;
-    private final boolean embedFilterValues;
+    protected final boolean embedFilterValues;
     protected final Map<ILogicalExpression, ARecordType> filterPaths;
 
     public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
new file mode 100644
index 0000000..323d3ed
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class ParquetExternalDatasetProjectionFiltrationInfo extends ExternalDatasetProjectionFiltrationInfo {
+
+    private ILogicalExpression parquetRowGroupFilterExpression;
+
+    public ParquetExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
+            Map<String, FunctionCallInformation> sourceInformationMap, Map<ILogicalExpression, ARecordType> filterPaths,
+            ILogicalExpression filterExpression, ILogicalExpression parquetRowGroupFilterExpression,
+            boolean embedFilterValues) {
+        super(projectedType, sourceInformationMap, filterPaths, filterExpression, embedFilterValues);
+        this.parquetRowGroupFilterExpression = parquetRowGroupFilterExpression;
+    }
+
+    private ParquetExternalDatasetProjectionFiltrationInfo(ParquetExternalDatasetProjectionFiltrationInfo other) {
+        super(other.projectedType, other.functionCallInfoMap, other.filterPaths, other.filterExpression,
+                other.embedFilterValues);
+        this.parquetRowGroupFilterExpression = other.parquetRowGroupFilterExpression;
+    }
+
+    public ILogicalExpression getParquetRowGroupFilterExpression() {
+        return parquetRowGroupFilterExpression;
+    }
+
+    @Override
+    public void print(AlgebricksStringBuilderWriter writer) {
+        super.print(writer);
+        if (parquetRowGroupFilterExpression != null) {
+            writer.append(" row-group-filter on: ");
+            writer.append(parquetRowGroupFilterExpression.toString());
+        }
+    }
+
+    @Override
+    public void print(JsonGenerator generator) throws IOException {
+        super.print(generator);
+        if (parquetRowGroupFilterExpression != null) {
+            generator.writeStringField("row-group-filter-on", parquetRowGroupFilterExpression.toString());
+        }
+    }
+
+    @Override
+    public ParquetExternalDatasetProjectionFiltrationInfo createCopy() {
+        return new ParquetExternalDatasetProjectionFiltrationInfo(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ParquetExternalDatasetProjectionFiltrationInfo otherInfo = (ParquetExternalDatasetProjectionFiltrationInfo) o;
+        return super.equals(o)
+                && filterExpressionEquals(parquetRowGroupFilterExpression, otherInfo.parquetRowGroupFilterExpression);
+    }
+}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index e90283d..35f9219 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1635,6 +1635,11 @@
       </dependency>
       <dependency>
         <groupId>software.amazon.awssdk</groupId>
+        <artifactId>apache-client</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
         <artifactId>http-client-spi</artifactId>
         <version>${awsjavasdk.version}</version>
       </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index 7a272cc..8f95e34 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -49,6 +49,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>algebricks-common</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index f6d9cd1..560f817 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -170,25 +171,21 @@
             PipelineAssembler pa =
                     new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
             startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
-            HyracksDataException exception = null;
+            Exception exception = null;
             try {
                 startOfPipeline.open();
             } catch (Exception e) {
-                startOfPipeline.fail();
-                exception = HyracksDataException.create(e);
+                exception = e;
             } finally {
-                try {
-                    startOfPipeline.close();
-                } catch (Exception e) {
-                    if (exception == null) {
-                        exception = HyracksDataException.create(e);
-                    } else {
-                        exception.addSuppressed(e);
-                    }
+                if (exception != null) {
+                    exception = InvokeUtil.tryUninterruptibleWithCleanups(exception, startOfPipeline::fail,
+                            startOfPipeline::close);
+                } else {
+                    exception = InvokeUtil.runUninterruptible(exception, startOfPipeline::close);
                 }
             }
             if (exception != null) {
-                throw exception;
+                throw HyracksDataException.create(exception);
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ff0a3c5..9326427 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -285,6 +285,57 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
+    public static Exception tryUninterruptibleWithCleanups(Exception root, ThrowingAction action,
+            ThrowingAction... cleanups) {
+        try {
+            tryUninterruptibleWithCleanups(action, cleanups);
+        } catch (Exception e) {
+            root = ExceptionUtils.suppress(root, e);
+        }
+        return root;
+    }
+
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
+    public static void tryUninterruptibleWithCleanups(ThrowingAction action, ThrowingAction... cleanups)
+            throws Exception {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            runUninterruptible(action);
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (ThrowingAction cleanup : cleanups) {
+                try {
+                    runUninterruptible(cleanup);
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT instanceof Exception) {
+            throw (Exception) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     // catching Throwable, instanceofs, false-positive unreachable code
     public static void tryWithCleanups(ThrowingAction action, ThrowingConsumer<Throwable>... cleanups)
             throws Exception {
@@ -426,6 +477,19 @@
      * Runs the supplied action, after suspending any pending interruption. An error will be logged if
      * the action is itself interrupted.
      */
+    public static Exception runUninterruptible(Exception root, ThrowingAction action) {
+        try {
+            runUninterruptible(action);
+        } catch (Exception e) {
+            root = ExceptionUtils.suppress(root, e);
+        }
+        return root;
+    }
+
+    /**
+     * Runs the supplied action, after suspending any pending interruption. An error will be logged if
+     * the action is itself interrupted.
+     */
     public static void runUninterruptible(ThrowingAction action) throws Exception {
         boolean interrupted = Thread.interrupted();
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index c36e4a7..da9c8fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -166,8 +166,12 @@
                     LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e);
                     Thread.currentThread().interrupt();
                 }
-                if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
-                    throw HyracksDataException.create(e);
+                try {
+                    if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
+                        throw HyracksDataException.create(e);
+                    }
+                } catch (InterruptedException interruptedEx) {
+                    throw HyracksDataException.create(interruptedEx);
                 }
                 attempt++;
                 retry.beforeRetry();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 0bf74ee..614a226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -63,8 +63,8 @@
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -420,30 +420,29 @@
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
-                    try {
-                        writer.open();
-                        VSizeFrame frame = new VSizeFrame(this);
-                        while (reader.nextFrame(frame)) {
-                            if (aborted) {
-                                return;
-                            }
-                            ByteBuffer buffer = frame.getBuffer();
-                            writer.nextFrame(buffer);
-                            buffer.compact();
+                    writer.open();
+                    VSizeFrame frame = new VSizeFrame(this);
+                    while (reader.nextFrame(frame)) {
+                        if (aborted) {
+                            return;
                         }
-                    } catch (Exception e) {
-                        originalEx = e;
-                        CleanupUtils.fail(writer, originalEx);
-                    } finally {
-                        originalEx = CleanupUtils.closeSilently(writer, originalEx);
+                        ByteBuffer buffer = frame.getBuffer();
+                        writer.nextFrame(buffer);
+                        buffer.compact();
                     }
+                } catch (Exception e) {
+                    originalEx = e;
                 } finally {
-                    originalEx = CleanupUtils.closeSilently(reader, originalEx);
+                    if (originalEx != null) {
+                        InvokeUtil.tryUninterruptibleWithCleanups(writer::fail, writer::close, reader::close);
+                    } else {
+                        InvokeUtil.tryUninterruptibleWithCleanups(writer::close, reader::close);
+                    }
                 }
             } catch (Exception e) {
                 originalEx = ExceptionUtils.suppress(originalEx, e);
             } finally {
-                originalEx = CleanupUtils.closeSilently(collector, originalEx);
+                InvokeUtil.runUninterruptible(collector::close);
             }
         } catch (Exception e) {
             originalEx = ExceptionUtils.suppress(originalEx, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
index 9740bc4..9dd8611 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreePointSearchCursor.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
@@ -288,11 +289,15 @@
 
     protected void closeCursors() throws HyracksDataException {
         if (btreeCursors != null) {
+            Throwable failure = null;
             for (int i = 0; i < numBTrees; ++i) {
                 if (btreeCursors[i] != null) {
-                    btreeCursors[i].close();
+                    failure = ResourceReleaseUtils.close(btreeCursors[i], failure);
                 }
             }
+            if (failure != null) {
+                throw HyracksDataException.create(failure);
+            }
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 2c97221..d53e686 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -127,8 +128,12 @@
             }
 
             if (rangeCursors != null) {
+                Throwable failure = null;
                 for (int i = 0; i < rangeCursors.length; i++) {
-                    rangeCursors[i].close();
+                    failure = ResourceReleaseUtils.close(rangeCursors[i], failure);
+                }
+                if (failure != null) {
+                    throw HyracksDataException.create(failure);
                 }
             }
         } finally {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 52bd07e..7ec4f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -127,12 +127,17 @@
         if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
             IRetryPolicy policy = new ExponentialRetryPolicy();
             while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
-                if (policy.retry(operation.getFailure())) {
-                    operation.setFailure(null);
-                    operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
-                    lsmHarness.flush(operation);
-                } else {
-                    break;
+                try {
+                    if (policy.retry(operation.getFailure())) {
+                        operation.setFailure(null);
+                        operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                        lsmHarness.flush(operation);
+                    } else {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    // in reality, this thread won't be interrupted
+                    throw HyracksDataException.create(e);
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index d8900ad..ee63547 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -98,12 +98,17 @@
         if (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
             IRetryPolicy policy = new ExponentialRetryPolicy();
             while (operation.getStatus() == ILSMIOOperation.LSMIOOperationStatus.FAILURE) {
-                if (policy.retry(operation.getFailure())) {
-                    operation.setFailure(null);
-                    operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
-                    lsmHarness.flush(operation);
-                } else {
-                    break;
+                try {
+                    if (policy.retry(operation.getFailure())) {
+                        operation.setFailure(null);
+                        operation.setStatus(ILSMIOOperation.LSMIOOperationStatus.SUCCESS);
+                        lsmHarness.flush(operation);
+                    } else {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    // in reality, this thread won't be interrupted
+                    throw HyracksDataException.create(e);
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 080b9ea..72a3bb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -84,17 +84,13 @@
     }
 
     @Override
-    public boolean retry(Throwable failure) {
+    public boolean retry(Throwable failure) throws InterruptedException {
         if (attempt < maxRetries) {
-            try {
-                long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
-                if (printDebugLines) {
-                    LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
-                }
-                TimeUnit.MILLISECONDS.sleep(sleepTime);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+            long sleepTime = ThreadLocalRandom.current().nextLong(1 + delay);
+            if (printDebugLines) {
+                LOGGER.info("Retrying after {}ms, attempt: {}/{}", sleepTime, attempt + 1, maxRetries);
             }
+            TimeUnit.MILLISECONDS.sleep(sleepTime);
             attempt++;
             delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
             return true;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
index 0d18a2b..2fd0191 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IRetryPolicy.java
@@ -24,6 +24,7 @@
      * @param failure
      *            the cause of the failure (this cannot be null)
      * @return true if one more attempt should be done
+     * @throws InterruptedException if the retry policy can be interrupted
      */
-    boolean retry(Throwable failure);
+    boolean retry(Throwable failure) throws InterruptedException;
 }