Merge branch 'gerrit/goldfish' into 'master'

Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
index d2ac8e5..378c758 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
@@ -398,14 +398,16 @@
         if (!pushableNestedSubplan) {
             return false;
         }
-
+        Set<ILogicalOperator> visited = new HashSet<>();
         for (int i = 0; i < nspOp.getNestedPlans().size(); i++) {
             Mutable<ILogicalOperator> nspAggRef = nspOp.getNestedPlans().get(i).getRoots().get(0);
             AggregateOperator nspAgg = (AggregateOperator) nspAggRef.getValue();
             Mutable<ILogicalOperator> nspAggChildRef = nspAgg.getInputs().get(0);
             LogicalVariable listifyVar = findListifiedVariable(nspAgg, varFromNestedAgg);
             if (listifyVar != null) {
-                OperatorManipulationUtil.substituteVarRec(aggInSubplanOp, unnestVar, listifyVar, true, context);
+                OperatorManipulationUtil.substituteVarRec(aggInSubplanOp, unnestVar, listifyVar, true, context,
+                        visited);
+                visited.clear();
                 nspAgg.getVariables().addAll(aggInSubplanOp.getVariables());
                 nspAgg.getExpressions().addAll(aggInSubplanOp.getExpressions());
                 for (LogicalVariable v : aggInSubplanOp.getVariables()) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
index 6d92b51..28166a9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
@@ -120,12 +120,14 @@
         AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) opRefJoin.getValue();
         gby.getDecorList().clear();
         gby.getDecorList().addAll(decorToPush);
+        Set<ILogicalOperator> visited = new HashSet<>();
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorNotToPush) {
             LogicalVariable v1 = p.first;
             if (v1 != null) {
                 VariableReferenceExpression varRef = (VariableReferenceExpression) p.second.getValue();
                 LogicalVariable v2 = varRef.getVariableReference();
-                OperatorManipulationUtil.substituteVarRec(join, v2, v1, true, context);
+                OperatorManipulationUtil.substituteVarRec(join, v2, v1, true, context, visited);
+                visited.clear();
             }
         }
         Mutable<ILogicalOperator> branchRef = join.getInputs().get(branch);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index db1bf50..07789b7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -37,6 +37,7 @@
 import java.util.Collection;
 
 import org.apache.asterix.test.external_dataset.avro.AvroFileConverterUtil;
+import org.apache.asterix.test.external_dataset.deltalake.DeltaAllTypeGenerator;
 import org.apache.asterix.test.external_dataset.deltalake.DeltaTableGenerator;
 import org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil;
 import org.apache.asterix.testframework.context.TestCaseContext;
@@ -122,6 +123,7 @@
         // cleaning directory
         BinaryFileConverterUtil.cleanBinaryDirectory(basePath, DELTA_GEN_BASEDIR);
         DeltaTableGenerator.prepareDeltaTableContainer(new Configuration());
+        DeltaAllTypeGenerator.createTableInsertData(new Configuration());
     }
 
     /**
@@ -434,6 +436,8 @@
         loadDeltaDirectory(generatedDataBasePath, "/modified_delta_table/_delta_log", JSON_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table", PARQUET_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/delta_all_type/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/delta_all_type", PARQUET_FILTER, "delta-data/");
     }
 
     private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaAllTypeGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaAllTypeGenerator.java
new file mode 100644
index 0000000..5c302a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaAllTypeGenerator.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.deltalake;
+
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.DataWriteContext;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Table;
+import io.delta.kernel.Transaction;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.DataFileStatus;
+
+public class DeltaAllTypeGenerator {
+    public static final DecimalType decimal_t = new DecimalType(10, 5);
+    protected static final StructType exampleTableSchema = new StructType().add("integer_type", IntegerType.INTEGER)
+            .add("string_type", StringType.STRING).add("decimal_type", decimal_t).add("double_type", DoubleType.DOUBLE)
+            .add("timestamp_type", TimestampType.TIMESTAMP).add("date_type", DateType.DATE);
+    public static final String DELTA_ALL_TYPE_TABLE_PATH =
+            "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_all_type";
+
+    public static void createTableInsertData(Configuration conf) throws IOException {
+        Engine engine = DefaultEngine.create(conf);
+        Table table = Table.forPath(engine, DELTA_ALL_TYPE_TABLE_PATH);
+        TransactionBuilder txnBuilder = table.createTransactionBuilder(engine, "Examples", Operation.CREATE_TABLE);
+        txnBuilder = txnBuilder.withSchema(engine, exampleTableSchema);
+        Transaction txn = txnBuilder.build(engine);
+        Row txnState = txn.getTransactionState(engine);
+        ColumnVector[] vectors = new ColumnVector[exampleTableSchema.length()];
+        vectors[0] = intVector(Arrays.asList(123, 124, 125, 126, 127));
+        vectors[1] = stringVector(
+                Arrays.asList("FirstPerson", "SecondPerson", "ThirdPerson", "FourthPerson", "FifthPerson"));
+        vectors[2] = decimalVector(Arrays.asList(new BigDecimal("1.25432"), new BigDecimal("2666.223"),
+                new BigDecimal("1245.2421"), new BigDecimal("23731.2"), new BigDecimal("80911.222456")));
+        vectors[3] = doubleVector(Arrays.asList(100.34d, 200.055d, 300.02d, 400.21014d, 500.219d));
+        vectors[4] = timestampVector(
+                Arrays.asList(1732010400000L, 1732010400330L, 1732010400450L, 1732010403000L, 1732010401200L));
+        vectors[5] = dateVector(Arrays.asList(127, 23, 11, 456, 23));
+        ColumnarBatch batch = new DefaultColumnarBatch(5, exampleTableSchema, vectors);
+        FilteredColumnarBatch f1 = new FilteredColumnarBatch(batch, Optional.empty());
+        CloseableIterator<FilteredColumnarBatch> data = toCloseableIterator(Arrays.asList(f1).iterator());
+        CloseableIterator<FilteredColumnarBatch> physicalData =
+                Transaction.transformLogicalData(engine, txnState, data, Collections.emptyMap());
+        DataWriteContext writeContext = Transaction.getWriteContext(engine, txnState, Collections.emptyMap());
+        CloseableIterator<DataFileStatus> dataFiles = engine.getParquetHandler().writeParquetFiles(
+                writeContext.getTargetDirectory(), physicalData, writeContext.getStatisticsColumns());
+        CloseableIterator<Row> dataActions =
+                Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext);
+        CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(dataActions);
+        txn.commit(engine, dataActionsIterable);
+
+    }
+
+    static ColumnVector stringVector(List<String> data) {
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return StringType.STRING;
+            }
+
+            @Override
+            public int getSize() {
+                return data.size();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return data.get(rowId) == null;
+            }
+
+            @Override
+            public String getString(int rowId) {
+                return data.get(rowId);
+            }
+        };
+    }
+
+    static ColumnVector intVector(List<Integer> data) {
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return IntegerType.INTEGER;
+            }
+
+            @Override
+            public int getSize() {
+                return data.size();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return false;
+            }
+
+            @Override
+            public int getInt(int rowId) {
+                return data.get(rowId);
+            }
+        };
+    }
+
+    static ColumnVector doubleVector(List<Double> data) {
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return DoubleType.DOUBLE;
+            }
+
+            @Override
+            public int getSize() {
+                return data.size();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return data.get(rowId) == null;
+            }
+
+            @Override
+            public double getDouble(int rowId) {
+                return data.get(rowId);
+            }
+        };
+    }
+
+    static ColumnVector decimalVector(List<BigDecimal> data) {
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return decimal_t; // Use the specific DecimalType passed (scale and precision)
+            }
+
+            @Override
+            public int getSize() {
+                return data.size();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return data.get(rowId) == null;
+            }
+
+            @Override
+            public BigDecimal getDecimal(int rowId) {
+                // Return the BigDecimal value directly as Delta Kernel works natively with BigDecimal for decimals
+                return data.get(rowId);
+            }
+        };
+    }
+
+    static ColumnVector timestampVector(List<Long> data) { // Assuming timestamp values are stored as microseconds since epoch
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return TimestampType.TIMESTAMP;
+            }
+
+            @Override
+            public int getSize() {
+                return data.size();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return data.get(rowId) == null;
+            }
+
+            @Override
+            public long getLong(int rowId) {
+                // Delta Lake often uses microseconds since epoch for timestamps
+                return data.get(rowId);
+            }
+        };
+    }
+
+    static ColumnVector dateVector(List<Integer> data) { // Assuming date values are stored as days since epoch
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return DateType.DATE;
+            }
+
+            @Override
+            public int getSize() {
+                return data.size();
+            }
+
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return data.get(rowId) == null;
+            }
+
+            @Override
+            public int getInt(int rowId) {
+                // Delta Lake often uses days since epoch for dates
+                return data.get(rowId);
+            }
+        };
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.00.ddl.sqlpp
new file mode 100644
index 0000000..cdae363
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.00.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE DeltalakeTableType as {
+ };
+
+CREATE EXTERNAL COLLECTION DeltalakeDataset1(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/delta_all_type"),
+   ("decimal-to-double" = "true"),
+   ("timestamp-to-long" = "false"),
+   ("date-to-int" = "false"),
+   ("timezone" =  "PST"),
+   ("table-format" = "delta")
+ );
+
+CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter%
+  (
+    %template%,
+    ("container"="playground"),
+    ("definition"="delta-data/delta_all_type"),
+    ("decimal-to-double" = "true"),
+    ("timestamp-to-long" = "true"),
+    ("date-to-int" = "true"),
+    ("table-format" = "delta")
+  );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.01.query.sqlpp
new file mode 100644
index 0000000..90f80ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT element ds FROM DeltalakeDataset1 as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.02.query.sqlpp
new file mode 100644
index 0000000..b75203b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-all-type/deltalake-all-type.02.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT element ds FROM DeltalakeDataset2 as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.00.ddl.sqlpp
new file mode 100644
index 0000000..4987403
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.00.ddl.sqlpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset1(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/delta_all_type"),
+   ("decimal-to-double" = "true"),
+   ("timestamp-to-long"="false"),
+   ("date-to-int"="false"),
+   ("timezone" = "PST"),
+   ("table-format" = "delta")
+ );
+
+  CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter%
+  (
+    %template%,
+    ("container"="playground"),
+    ("definition"="delta-data/delta_all_type"),
+    ("decimal-to-double" = "true"),
+    ("timestamp-to-long"="false"),
+    ("date-to-int"="false"),
+    ("timezone" = "PST"),
+    ("table-format" = "delta")
+  );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.query.sqlpp
new file mode 100644
index 0000000..4180599
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.query.sqlpp
new file mode 100644
index 0000000..d300959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+Explain
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.query.sqlpp
new file mode 100644
index 0000000..073171c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.query.sqlpp
new file mode 100644
index 0000000..d300959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+Explain
+SELECT VALUE d
+FROM DeltalakeDataset1 d
+ORDER BY d.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.query.sqlpp
new file mode 100644
index 0000000..fa60442
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.query.sqlpp
new file mode 100644
index 0000000..e1ecedd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "false";
+
+EXPLAIN
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.query.sqlpp
new file mode 100644
index 0000000..1e440bf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.query.sqlpp
new file mode 100644
index 0000000..786618a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+Explain
+SELECT d1.integer_type, d2.string_type, d2.timestamp_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.query.sqlpp
new file mode 100644
index 0000000..d2c9153
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+SELECT d1.integer_type, d1.date_type, d2.decimal_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.query.sqlpp
new file mode 100644
index 0000000..bb4701a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+
+Explain
+SELECT d1.integer_type, d1.date_type, d2.decimal_type
+FROM DeltalakeDataset1 d1, DeltalakeDataset2 d2
+WHERE d1.integer_type = d2.integer_type
+ORDER BY d2.integer_type;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
new file mode 100644
index 0000000..c57de93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/s1"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.01.adm
new file mode 100644
index 0000000..e5f99a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.01.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-05-08") }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-24") }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-12") }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": datetime("1970-01-20T17:06:50.403"), "date_type": date("1971-04-02") }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": datetime("1970-01-20T17:06:50.401"), "date_type": date("1970-01-24") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.02.adm
new file mode 100644
index 0000000..9cc21d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-all-type/deltalake-all-type.02.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": 1732010400, "date_type": 127 }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": 1732010400, "date_type": 23 }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": 1732010400, "date_type": 11 }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": 1732010403, "date_type": 456 }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": 1732010401, "date_type": 23 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.adm
new file mode 100644
index 0000000..e5f99a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.01.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-05-08") }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-24") }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-12") }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": datetime("1970-01-20T17:06:50.403"), "date_type": date("1971-04-02") }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": datetime("1970-01-20T17:06:50.401"), "date_type": date("1970-01-24") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.plan
new file mode 100644
index 0000000..6f01ee7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.02.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- SORT_MERGE_EXCHANGE [$$14(ASC) ]  |PARTITIONED|
+        order (ASC, $$14) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STABLE_SORT [$$14(ASC)]  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            assign [$$14] <- [$$d.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ASSIGN  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$d] <- test.DeltalakeDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.adm
new file mode 100644
index 0000000..e5f99a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.03.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "string_type": "FirstPerson", "decimal_type": 1.25432, "double_type": 100.34, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-05-08") }
+{ "integer_type": 124, "string_type": "SecondPerson", "decimal_type": 2666.223, "double_type": 200.055, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-24") }
+{ "integer_type": 125, "string_type": "ThirdPerson", "decimal_type": 1245.2421, "double_type": 300.02, "timestamp_type": datetime("1970-01-20T17:06:50.400"), "date_type": date("1970-01-12") }
+{ "integer_type": 126, "string_type": "FourthPerson", "decimal_type": 23731.2, "double_type": 400.21014, "timestamp_type": datetime("1970-01-20T17:06:50.403"), "date_type": date("1971-04-02") }
+{ "integer_type": 127, "string_type": "FifthPerson", "decimal_type": 80911.22245, "double_type": 500.219, "timestamp_type": datetime("1970-01-20T17:06:50.401"), "date_type": date("1970-01-24") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.plan
new file mode 100644
index 0000000..6f01ee7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.04.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- SORT_MERGE_EXCHANGE [$$14(ASC) ]  |PARTITIONED|
+        order (ASC, $$14) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STABLE_SORT [$$14(ASC)]  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            assign [$$14] <- [$$d.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ASSIGN  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$d] <- test.DeltalakeDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.adm
new file mode 100644
index 0000000..1f480e4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.05.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 125, "string_type": "ThirdPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 126, "string_type": "FourthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.403") }
+{ "integer_type": 127, "string_type": "FifthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.401") }
+{ "integer_type": 124, "string_type": "SecondPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 123, "string_type": "FirstPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.plan
new file mode 100644
index 0000000..848bc04
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.06.plan
@@ -0,0 +1,50 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"integer_type": $$34, "string_type": $$37, "timestamp_type": $$38}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$34, $$37, $$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- SORT_MERGE_EXCHANGE [$$36(ASC) ]  |PARTITIONED|
+            order (ASC, $$36) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STABLE_SORT [$$36(ASC)]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([$$34, $$37, $$38, $$36]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    join (eq($$34, $$35)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HYBRID_HASH_JOIN [$$34][$$35]  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
+                        project ([$$34]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$34] <- [$$d1.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$d1] <- test.DeltalakeDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- HASH_PARTITION_EXCHANGE [$$35]  |PARTITIONED|
+                        project ([$$37, $$38, $$36, $$35]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$38, $$37, $$36, $$35] <- [$$d2.getField("timestamp_type"), $$d2.getField("string_type"), $$d2.getField("id"), $$d2.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$d2] <- test.DeltalakeDataset2 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.adm
new file mode 100644
index 0000000..1f480e4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.07.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 125, "string_type": "ThirdPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 126, "string_type": "FourthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.403") }
+{ "integer_type": 127, "string_type": "FifthPerson", "timestamp_type": datetime("1970-01-20T17:06:50.401") }
+{ "integer_type": 124, "string_type": "SecondPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
+{ "integer_type": 123, "string_type": "FirstPerson", "timestamp_type": datetime("1970-01-20T17:06:50.400") }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.plan
new file mode 100644
index 0000000..34671dd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.08.plan
@@ -0,0 +1,50 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"integer_type": $$34, "string_type": $$37, "timestamp_type": $$38}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$34, $$37, $$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- SORT_MERGE_EXCHANGE [$$36(ASC) ]  |PARTITIONED|
+            order (ASC, $$36) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STABLE_SORT [$$36(ASC)]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([$$34, $$37, $$38, $$36]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    join (eq($$34, $$35)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HYBRID_HASH_JOIN [$$34][$$35]  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
+                        project ([$$34]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$34] <- [$$d1.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$d1] <- test.DeltalakeDataset1 project ({integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- HASH_PARTITION_EXCHANGE [$$35]  |PARTITIONED|
+                        project ([$$37, $$38, $$36, $$35]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$38, $$37, $$36, $$35] <- [$$d2.getField("timestamp_type"), $$d2.getField("string_type"), $$d2.getField("id"), $$d2.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$d2] <- test.DeltalakeDataset2 project ({string_type:any,timestamp_type:any,id:any,integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.adm
new file mode 100644
index 0000000..5e1edce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.09.adm
@@ -0,0 +1,5 @@
+{ "integer_type": 123, "date_type": date("1970-05-08"), "decimal_type": 1.25432 }
+{ "integer_type": 124, "date_type": date("1970-01-24"), "decimal_type": 2666.223 }
+{ "integer_type": 125, "date_type": date("1970-01-12"), "decimal_type": 1245.2421 }
+{ "integer_type": 126, "date_type": date("1971-04-02"), "decimal_type": 23731.2 }
+{ "integer_type": 127, "date_type": date("1970-01-24"), "decimal_type": 80911.22245 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.adm
new file mode 100644
index 0000000..f706ba2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-field-access-pushdown/deltalake-field-access-pushdown.10.adm
@@ -0,0 +1,46 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"integer_type": $$34, "date_type": $$37, "decimal_type": $$38}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$34, $$37, $$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- SORT_MERGE_EXCHANGE [$$35(ASC) ]  |PARTITIONED|
+            order (ASC, $$35) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STABLE_SORT [$$35(ASC)]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$34, $$35)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$34][$$35]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
+                    project ([$$34, $$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$37, $$34] <- [$$d1.getField("date_type"), $$d1.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$d1] <- test.DeltalakeDataset1 project ({date_type:any,integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$35]  |PARTITIONED|
+                    project ([$$38, $$35]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$38, $$35] <- [$$d2.getField("decimal_type"), $$d2.getField("integer_type")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$d2] <- test.DeltalakeDataset2 project ({decimal_type:any,integer_type:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 7aaa8b2..5ae2fc5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -525,6 +525,18 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-all-type">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-all-type</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-field-access-pushdown">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-field-access-pushdown</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/deltalake-invalid-file-format">
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">none</output-dir>
@@ -532,6 +544,13 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-table-not-exists">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">none</output-dir>
+        <expected-error>ASX1108: External source error. io.delta.kernel.exceptions.TableNotFoundException: Delta table at path `s3a://playground/delta-data/s1` is not found.</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">common/avro/avro-types/avro-map</output-dir>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
         }
 
         @Override
+        public void beforeExit(boolean success) throws HyracksDataException {
+            // No Op
+        }
+
+        @Override
         public void close() throws IOException {
             // No Op
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java
new file mode 100644
index 0000000..9dd2000
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AsterixDeltaRuntimeException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    private final HyracksDataException hyracksDataException;
+
+    public AsterixDeltaRuntimeException(HyracksDataException e) {
+        this.hyracksDataException = e;
+    }
+
+    public HyracksDataException getHyracksDataException() {
+        return hyracksDataException;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
new file mode 100644
index 0000000..3ca704f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+/**
+ * This visitor clips the filesSchema stored in Delta table metadata using the expected type
+ */
+public class AsterixTypeToDeltaTypeVisitor implements IATypeVisitor<DataType, DataType> {
+
+    private final DeltaConverterContext context;
+    private Map<String, FunctionCallInformation> funcInfo;
+
+    public AsterixTypeToDeltaTypeVisitor(DeltaConverterContext context) {
+        this.context = context;
+    }
+
+    public StructType clipType(ARecordType rootType, StructType fileSchema,
+            Map<String, FunctionCallInformation> funcInfo) {
+        if (rootType == EMPTY_TYPE) {
+            return new StructType();
+        } else if (rootType == ALL_FIELDS_TYPE) {
+            return fileSchema;
+        }
+        StructType builder = new StructType();
+        this.funcInfo = funcInfo;
+        return clipObjectChildren(builder, rootType, fileSchema);
+    }
+
+    @Override
+    public DataType visit(ARecordType recordType, DataType arg) {
+        if (isNotCompatibleType(arg, recordType)) {
+            return null;
+        }
+        StructType builder = new StructType();
+        builder = clipObjectChildren(builder, recordType, (StructType) arg);
+        if (builder.fields().size() == 0) {
+            return null;
+        }
+        return builder;
+    }
+
+    @Override
+    public DataType visit(AbstractCollectionType collectionType, DataType arg) {
+        if (isNotCompatibleType(arg, collectionType)) {
+            return null;
+        }
+        DataType elementSchema = ((ArrayType) arg).getElementType();
+        DataType requestedChildType = collectionType.getItemType().accept(this, elementSchema);
+        return new ArrayType(requestedChildType, true);
+    }
+
+    private StructType clipObjectChildren(StructType builder, ARecordType recordType, StructType arg) {
+        String[] fieldNames = recordType.getFieldNames();
+        IAType[] fieldTypes = recordType.getFieldTypes();
+        for (int i = 0; i < fieldNames.length; i++) {
+            // If the field is not present in the file schema, we skip it
+            if (arg.fieldNames().contains(fieldNames[i])) {
+                DataType type = arg.get(fieldNames[i]).getDataType();
+                DataType childType = fieldTypes[i].accept(this, type);
+                if (childType != null) {
+                    // We only add non-MISSING children
+                    builder = builder.add(fieldNames[i], childType);
+                }
+            }
+        }
+        return builder;
+    }
+
+    @Override
+    public DataType visit(AUnionType unionType, DataType arg) {
+        if (arg instanceof ArrayType) {
+            return unionType.getType(ATypeTag.ARRAY).accept(this, arg);
+        } else {
+            return unionType.getType(ATypeTag.OBJECT).accept(this, arg);
+        }
+    }
+
+    @Override
+    public DataType visitFlat(IAType node, DataType arg) {
+        return arg;
+    }
+
+    private boolean isNotCompatibleType(DataType type, IAType node) {
+        // typeName is unique
+        FunctionCallInformation info = funcInfo.get(node.getTypeName());
+        ATypeTag actualType = null;
+        try {
+            actualType = DeltaDataParser.getTypeTag(type, false, context);
+        } catch (HyracksDataException e) {
+            throw new AsterixDeltaRuntimeException(e);
+        }
+        ATypeTag expectedType = node.getTypeTag();
+
+        boolean isNotExpected = actualType != expectedType;
+        if (isNotExpected) {
+            //If no warning is created, then it means it has been reported
+            Warning warning = null;
+            if (actualType != ATypeTag.SYSTEM_NULL) {
+                warning = info.createWarning(expectedType, actualType);
+            }
+            if (warning != null) {
+                //New warning that we saw for the first time. We should report it.
+                context.getWarnings().add(warning);
+            }
+        }
+        return isNotExpected;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..8dc820b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,23 +19,33 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -43,6 +53,7 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,7 +63,11 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
 
 public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
 
@@ -61,10 +76,9 @@
             Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
     private static final Logger LOGGER = LogManager.getLogger();
     private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
-    private Map<Integer, List<String>> schedule;
     private String scanState;
     private Map<String, String> configuration;
-    private List<String> scanFiles;
+    protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -77,16 +91,7 @@
             throws AlgebricksException, HyracksDataException {
         this.configuration = configuration;
         Configuration conf = new Configuration();
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
+        configurationBuilder(configuration, conf);
         String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                 + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -95,26 +100,61 @@
 
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
-        Snapshot snapshot = table.getLatestSnapshot(engine);
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+        Snapshot snapshot;
+        try {
+            snapshot = table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
+
+        List<Warning> warnings = new ArrayList<>();
+        DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
+        AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
+        StructType requiredSchema;
+        try {
+            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+            Map<String, FunctionCallInformation> functionCallInformationMap =
+                    HDFSUtils.getFunctionCallInformationMap(conf);
+            StructType fileSchema = snapshot.getSchema(engine);
+            requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (AsterixDeltaRuntimeException e) {
+            throw e.getHyracksDataException();
+        }
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
 
-        scanFiles = new ArrayList<>();
+        List<Row> scanFiles = new ArrayList<>();
         while (iter.hasNext()) {
             FilteredColumnarBatch batch = iter.next();
             CloseableIterator<Row> rowIter = batch.getRows();
             while (rowIter.hasNext()) {
                 Row row = rowIter.next();
-                scanFiles.add(RowSerDe.serializeRowToJson(row));
+                scanFiles.add(row);
             }
         }
-        locationConstraints = configureLocationConstraints(appCtx);
+        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
         configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles();
+        distributeFiles(scanFiles);
+        issueWarnings(warnings, warningCollector);
     }
 
-    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
+    private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
+        if (!warnings.isEmpty()) {
+            for (Warning warning : warnings) {
+                if (warningCollector.shouldWarn()) {
+                    warningCollector.warn(warning);
+                }
+            }
+        }
+        warnings.clear();
+    }
+
+    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
+            List<Row> scanFiles) {
         IClusterStateManager csm = appCtx.getClusterStateManager();
 
         String[] locations = csm.getClusterLocations().getLocations();
@@ -131,24 +171,47 @@
         return new AlgebricksAbsolutePartitionConstraint(locations);
     }
 
-    private void distributeFiles() {
-        final int numComputePartitions = getPartitionConstraint().getLocations().length;
-        schedule = new HashMap<>();
-        for (int i = 0; i < numComputePartitions; i++) {
-            schedule.put(i, new ArrayList<>());
+    private void distributeFiles(List<Row> scanFiles) {
+        final int partitionsCount = getPartitionConstraint().getLocations().length;
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
         }
-        int i = 0;
-        for (String scanFile : scanFiles) {
-            schedule.get(i).add(scanFile);
-            i = (i + 1) % numComputePartitions;
+        for (Row scanFileRow : scanFiles) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
+            workloadQueue.add(workload);
         }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+
+    public static void configurationBuilder(Map<String, String> configuration, Configuration conf) {
+        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+            conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+        }
+        conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
+        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+        if (serviceEndpoint != null) {
+            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }
+        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+                configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+        conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+                configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
     }
 
     @Override
     public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
         try {
             int partition = context.getPartition();
-            return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context);
+            return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
+                    configuration, context);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -169,4 +232,31 @@
         return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
     }
 
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final List<String> scanFiles = new ArrayList<>();
+        private long totalSize = 0;
+
+        public PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<String> getScanFiles() {
+            return scanFiles;
+        }
+
+        public void addScanFile(String scanFile, long size) {
+            this.scanFiles.add(scanFile);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+        }
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a9..a5b21b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -133,9 +133,13 @@
             scanFile = scanFiles.get(fileIndex);
             fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
-            physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                    physicalReadSchema, Optional.empty());
-            dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+            try {
+                physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+                        physicalReadSchema, Optional.empty());
+                dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
             return this.hasNext();
         } else {
             return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
index 81e465c..a404e2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.aws.delta.converter;
 
 import java.io.DataOutput;
+import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
@@ -32,6 +33,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
 
 public class DeltaConverterContext extends ParserContext {
     @SuppressWarnings("unchecked")
@@ -47,8 +49,10 @@
     private final int timeZoneOffset;
     private final AMutableDate mutableDate = new AMutableDate(0);
     private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+    private final List<Warning> warnings;
 
-    public DeltaConverterContext(Map<String, String> configuration) {
+    public DeltaConverterContext(Map<String, String> configuration, List<Warning> warnings) {
+        this.warnings = warnings;
         decimalToDouble = Boolean.parseBoolean(configuration
                 .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
         timestampAsLong = Boolean.parseBoolean(configuration
@@ -96,4 +100,8 @@
     public boolean isDateAsInt() {
         return dateAsInt;
     }
+
+    public List<Warning> getWarnings() {
+        return warnings;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index ea02d77..adb846d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -23,6 +23,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -42,6 +44,7 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IValueReference;
 
@@ -68,7 +71,8 @@
     private final IExternalFilterValueEmbedder valueEmbedder;
 
     public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
-        parserContext = new DeltaConverterContext(conf);
+        List<Warning> warnings = new ArrayList<>();
+        parserContext = new DeltaConverterContext(conf, warnings);
         valueEmbedder = context.getValueEmbedder();
     }
 
@@ -91,7 +95,7 @@
         for (int i = 0; i < schema.fields().size(); i++) {
             DataType fieldSchema = schema.fields().get(i).getDataType();
             String fieldName = schema.fieldNames().get(i);
-            ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+            ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i), parserContext);
             IValueReference value = null;
             if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
                 value = valueEmbedder.getEmbeddedValue();
@@ -120,7 +124,7 @@
         for (int i = 0; i < schema.fields().size(); i++) {
             DataType fieldSchema = schema.fields().get(i).getDataType();
             String fieldName = schema.fieldNames().get(i);
-            ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+            ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index), parserContext);
             IValueReference value = null;
             if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
                 value = valueEmbedder.getEmbeddedValue();
@@ -157,7 +161,8 @@
         parserContext.exitCollection(valueBuffer, arrayBuilder);
     }
 
-    private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+    public static ATypeTag getTypeTag(DataType schema, boolean isNull, DeltaConverterContext parserContext)
+            throws HyracksDataException {
         if (isNull) {
             return ATypeTag.NULL;
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 1d5f2ed..f06638d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
 import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 import static org.msgpack.core.MessagePack.Code.ARRAY16;
 
 import java.io.ByteArrayOutputStream;
@@ -71,6 +72,7 @@
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -109,6 +111,12 @@
 import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
 
 public class ExternalDataUtils {
 
@@ -117,6 +125,8 @@
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
 
+    private static final Logger LOGGER = LogManager.getLogger();
+
     static {
         valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
         valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
@@ -505,6 +515,26 @@
         }
     }
 
+    public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException {
+        Configuration conf = new Configuration();
+        String tableMetadataPath = null;
+        if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+            AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf);
+            tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+                    + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                    + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        }
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+        try {
+            table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
+    }
+
     public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
             String tableMetadataPath) throws AlgebricksException {
         HadoopTables tables = new HadoopTables(conf);
@@ -936,8 +966,8 @@
     }
 
     public static boolean supportsPushdown(Map<String, String> properties) {
-        //Currently, only Apache Parquet format is supported
-        return isParquetFormat(properties);
+        //Currently, only Apache Parquet/Delta table format is supported
+        return isParquetFormat(properties) || isDeltaTable(properties);
     }
 
     /**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 2ba1844..f36d25d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -24,6 +24,7 @@
 import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -468,5 +469,8 @@
         if (!response.sdkHttpResponse().isSuccessful()) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableExists(configuration);
+        }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 6314ce8..2613f34 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -45,4 +45,16 @@
     public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
     public static final String HADOOP_MAX_REQUESTS_PER_BATCH = "fs.gs.max.requests.per.batch";
     public static final String HADOOP_BATCH_THREADS = "fs.gs.batch.threads";
+
+    public static class JSON_CREDENTIALS_FIELDS {
+        public static final String PRIVATE_KEY_ID = "private_key_id";
+        public static final String PRIVATE_KEY = "private_key";
+        public static final String CLIENT_EMAIL = "client_email";
+    }
+
+    public static class HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS {
+        public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id";
+        public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key";
+        public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 5274c44..74a664d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -25,8 +25,6 @@
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
@@ -37,7 +35,6 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +56,10 @@
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.gax.paging.Page;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.BaseServiceException;
@@ -73,6 +74,11 @@
 
     }
 
+    private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper();
+    static {
+        JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+    }
+
     /**
      * Builds the client using the provided configuration
      *
@@ -218,7 +224,8 @@
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
-    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions) {
+    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
+            throws AlgebricksException {
         String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
         String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 
@@ -234,15 +241,25 @@
         //        conf.set(GCSConstants.HADOOP_BATCH_THREADS, String.valueOf(numberOfPartitions));
 
         // authentication method
-        // TODO(htowaileb): find a way to pass the content instead of the path to keyfile, this line is temporary
-        Path credentials = Path.of("credentials.json");
         if (jsonCredentials == null) {
             // anonymous access
             conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
         } else {
-            // TODO(htowaileb) need to pass the file content
-            conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE);
-            conf.set(HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH, credentials.toAbsolutePath().toString());
+            try {
+                JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
+                // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
+                // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
+                // version 3.x.y, which also removed support for hadoop-2
+                conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY_ID,
+                        jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY_ID).asText());
+                conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY,
+                        jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY).asText());
+                conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.CLIENT_EMAIL,
+                        jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.CLIENT_EMAIL).asText());
+            } catch (JsonProcessingException e) {
+                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials",
+                        getMessageOrToString(e));
+            }
         }
 
         // set endpoint if provided, default is https://storage.googleapis.com/
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 072a9b6..9b8b103 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -58,6 +60,7 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -95,12 +98,13 @@
     private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
 
     private final IFrameOperationCallback[] frameOpCallbacks;
-    private boolean flushedPartialTuples;
     private final PermutingFrameTupleReference keyTuple;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
     private final IntSet processedTuples = new IntOpenHashSet();
     private final IntSet flushedTuples = new IntOpenHashSet();
     private final SourceLocation sourceLoc;
+    private boolean flushedPartialTuples;
+    private IBatchController batchController;
 
     public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
@@ -284,6 +288,7 @@
             searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             frameTuple = new FrameTupleReference();
+            batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
         }
@@ -305,7 +310,8 @@
             LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
             IFrameTupleProcessor processor = processors[pIdx];
-            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+                    p2tuplesMapEntry.getValue());
         }
 
         writeBuffer.ensureFrameSize(buffer.capacity());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8bc2b1c..dd9f4070d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -71,6 +73,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -140,11 +143,12 @@
     private final ITracer tracer;
     private final long traceCategory;
     private final ITupleProjector tupleProjector;
-    private long lastRecordInTimeStamp = 0L;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
     private final boolean hasSecondaries;
     private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
     private final int[] fieldPermutation;
+    private long lastRecordInTimeStamp = 0L;
+    private IBatchController batchController;
 
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -362,6 +366,11 @@
                 }
 
                 @Override
+                public void beforeExit(boolean success) throws HyracksDataException {
+                    callback.beforeExit(success);
+                }
+
+                @Override
                 public void close() throws IOException {
                     callback.close();
                 }
@@ -378,6 +387,7 @@
             };
             frameOpCallbacks[i].open();
         }
+        batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
     }
 
     protected void resetSearchPredicate(int tupleIndex) {
@@ -437,7 +447,8 @@
             LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
             IFrameTupleProcessor processor = processors[pIdx];
-            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+                    p2tuplesMapEntry.getValue());
         }
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
@@ -624,4 +635,5 @@
             }
         }
     }
+
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+    static final IBatchController INSTANCE = new StandardBatchController();
+
+    private StandardBatchController() {
+    }
+
+    @Override
+    public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException {
+        lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+    }
+
+    @Override
+    public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException {
+        lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 2413ed2..f8fb6c2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -181,17 +181,24 @@
     }
 
     public static void substituteVarRec(AbstractLogicalOperator op, LogicalVariable v1, LogicalVariable v2,
-            boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+            boolean goThroughNts, ITypingContext ctx, Set<ILogicalOperator> visited) throws AlgebricksException {
         VariableUtilities.substituteVariables(op, v1, v2, goThroughNts, ctx);
         for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
-            substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx);
+            if (visited.contains(opRef2.getValue())) {
+                continue;
+            }
+            visited.add(opRef2.getValue());
+            substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx, visited);
         }
         if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE && goThroughNts) {
             NestedTupleSourceOperator nts = (NestedTupleSourceOperator) op;
             if (nts.getDataSourceReference() != null) {
                 AbstractLogicalOperator op2 =
                         (AbstractLogicalOperator) nts.getDataSourceReference().getValue().getInputs().get(0).getValue();
-                substituteVarRec(op2, v1, v2, goThroughNts, ctx);
+                if (!visited.contains(op2)) {
+                    visited.add(op2);
+                    substituteVarRec(op2, v1, v2, goThroughNts, ctx, visited);
+                }
             }
         }
         if (op.hasNestedPlans()) {
@@ -199,7 +206,11 @@
             for (ILogicalPlan p : aonp.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> ref : p.getRoots()) {
                     AbstractLogicalOperator aop = (AbstractLogicalOperator) ref.getValue();
-                    substituteVarRec(aop, v1, v2, goThroughNts, ctx);
+                    if (visited.contains(aop)) {
+                        continue;
+                    }
+                    visited.add(aop);
+                    substituteVarRec(aop, v1, v2, goThroughNts, ctx, visited);
                 }
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
index 55c8400..dbdf6c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
@@ -92,6 +92,7 @@
     protected void buildVarExprList(Collection<LogicalVariable> vars, IOptimizationContext context, GroupByOperator g,
             List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
         SourceLocation sourceLoc = g.getSourceLocation();
+        Set<ILogicalOperator> visited = new HashSet<>();
         for (LogicalVariable ov : vars) {
             LogicalVariable newVar = context.newVar();
             VariableReferenceExpression varExpr = new VariableReferenceExpression(newVar);
@@ -101,7 +102,8 @@
             for (ILogicalPlan p : g.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), ov, newVar, true,
-                            context);
+                            context, visited);
+                    visited.clear();
                 }
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
index 391d03a..36cad77 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
@@ -331,6 +331,7 @@
             List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
         SourceLocation sourceLoc = g.getSourceLocation();
         Map<LogicalVariable, LogicalVariable> m = new HashMap<LogicalVariable, LogicalVariable>();
+        Set<ILogicalOperator> visited = new HashSet<>();
         for (LogicalVariable ov : vars) {
             LogicalVariable newVar = context.newVar();
             ILogicalExpression varExpr = new VariableReferenceExpression(newVar);
@@ -340,11 +341,13 @@
             for (ILogicalPlan p : g.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), ov, newVar, true,
-                            context);
+                            context, visited);
+                    visited.clear();
                 }
             }
             AbstractLogicalOperator opUnder = (AbstractLogicalOperator) g.getInputs().get(0).getValue();
-            OperatorManipulationUtil.substituteVarRec(opUnder, ov, newVar, true, context);
+            OperatorManipulationUtil.substituteVarRec(opUnder, ov, newVar, true, context, visited);
+            visited.clear();
             m.put(ov, newVar);
         }
         return m;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
      * @param writer the FrameWriter to write to and flush
      * @throws HyracksDataException
      */
-    public default void flush(IFrameWriter writer) throws HyracksDataException {
+    default void flush(IFrameWriter writer) throws HyracksDataException {
         write(writer, true);
         writer.flush();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+    void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 3ed17b1..ad5e919 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -288,6 +288,42 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+    public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+            throws HyracksDataException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (HyracksThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) {
         Throwable savedT = null;
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
-
-    /*
-     * Always write and then flush to send out the message if exists
-     */
-    @Override
-    public void flush(IFrameWriter writer) throws HyracksDataException {
-        write(writer, true);
-        writer.flush();
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
         Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
         return sharedMap == null ? null : (T) sharedMap.get(key);
     }
+
+    /**
+     * get a <T> object from the shared map of the task, or returns the default value
+     *
+     * @param key
+     * @param ctx
+     * @param defaultValue
+     * @return the value associated with the key casted as T
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+        Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+        return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+    String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+    void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException;
+
+    void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
 public interface IFrameOperationCallback extends Closeable {
     /**
      * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
-     * the pipeline
+     * the pipeline. In the event this frame completion will also exit the component, this will be
+     * called prior to {@link #beforeExit(boolean)}.
      *
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
 
     /**
-     * Called when the task has failed.
+     * Called just prior to exiting the component on batch completion: not all batches may result
+     * in a component exit, depending on the decision of the {@link IBatchController}.
+     *
+     * @throws HyracksDataException
+     */
+    void beforeExit(boolean success) throws HyracksDataException;
+
+    /**
+     * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+     * invocation has failed.
      *
      * @param th
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 214d9dc..dbf34c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -227,10 +227,17 @@
      *            the callback at the end of the frame
      * @param tuples
      *            the indexes of tuples to process
+     * @param batchController
+     *            the controller of the batch lifecycle
      * @throws HyracksDataException
      */
     void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+            Set<Integer> tuples) throws HyracksDataException;
+
+    void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+    void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
             throws HyracksDataException;
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2d04020..d019a08 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -692,15 +693,27 @@
         lsmIndex.updateFilter(ctx, tuple);
     }
 
-    private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+    @Override
+    public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
         if (!lsmIndex.isMemoryComponentsAllocated()) {
             lsmIndex.allocateMemoryComponents();
         }
-        getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+        getAndEnterComponents(ctx, op, false);
     }
 
-    private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
-        getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+    @Override
+    public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+            LSMOperationType op) throws HyracksDataException {
+        try {
+            callback.beforeExit(success);
+        } catch (Throwable th) {
+            // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+            //              failures in the callback API- we might need this eventually
+            callback.fail(th);
+            throw th;
+        } finally {
+            getAndExitComponentsAndComplete(ctx, op);
+        }
     }
 
     private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -715,13 +728,15 @@
 
     @Override
     public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
-            throws HyracksDataException {
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+            Set<Integer> tuples) throws HyracksDataException {
         processor.start();
-        enter(ctx);
+        batchController.batchEnter(ctx, this, frameOpCallback);
+        boolean success = false;
         try {
             try {
                 processFrame(accessor, tuple, processor, tuples);
+                success = true;
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -733,7 +748,7 @@
             LOGGER.warn("Failed to process frame", e);
             throw e;
         } finally {
-            exit(ctx);
+            batchController.batchExit(ctx, this, frameOpCallback, success);
             ctx.logPerformanceCounters(accessor.getTupleCount());
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fb5984d..c768768 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -211,8 +212,9 @@
     }
 
     public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
-            IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
+            IFrameOperationCallback frameOpCallback, IBatchController batchController, Set<Integer> tuples)
+            throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController, tuples);
     }
 
     @Override