[ASTERIXDB-3576][EXT] push predicates down to delta tables to filter row groups

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Delta table's data files are essentially Parquet files. Parquet allows
applying a predicate while reading data files to skip row groups.
With this patch we pushdown filters to individual parquet files of the
Delta table to filter row groups. The Predicate class of the Delta Kernel API
is not serializable, so we have added a custom serialization/de-serialization
of Delta kernel APIs Predicates.

Ext-ref: MB-65315
Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a094c22..121a76b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,12 +19,15 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -41,6 +44,7 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.internal.data.ScanStateRow;
 import io.delta.kernel.types.StructType;
@@ -65,9 +69,10 @@
     private int fileIndex;
     private Row scanFile;
     private CloseableIterator<Row> rows;
+    private Optional<Predicate> filterPredicate;
 
-    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config)
-            throws HyracksDataException {
+    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config,
+            String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
         this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
@@ -85,15 +90,16 @@
             this.scanFile = scanFiles.get(0);
             this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            this.filterPredicate = PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
             try {
                 this.physicalDataIter = engine.getParquetHandler()
-                        .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty());
+                        .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate);
                 this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
             }
         }
     }
@@ -122,7 +128,7 @@
             physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
             try {
                 physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                        physicalReadSchema, Optional.empty());
+                        physicalReadSchema, filterPredicate);
                 dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 4e902b9..b76dd4d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -79,6 +79,7 @@
     private String scanState;
     protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
+    private String filterExpressionStr;
 
     public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() {
         return partitionWorkLoadsBasedOnSize;
@@ -133,8 +134,14 @@
         if (filterExpression != null) {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema)
                     .withFilter(engine, (Predicate) filterExpression).build();
+            if (scan.getRemainingFilter().isPresent()) {
+                filterExpressionStr = PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get());
+            } else {
+                filterExpressionStr = null;
+            }
         } else {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+            filterExpressionStr = null;
         }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         List<Row> scanFiles;
@@ -145,6 +152,7 @@
             // We need to fall back to skip applying the filter and return all files.
             LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage());
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+            filterExpressionStr = null;
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
         }
@@ -206,7 +214,7 @@
         try {
             int partition = context.getPartition();
             return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
-                    confFactory);
+                    confFactory, filterExpressionStr);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
new file mode 100644
index 0000000..efd47f5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+/**
+ * Utility class to serialize and deserialize {@link Predicate} object.
+ */
+public class PredicateSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private PredicateSerDe() {
+    }
+
+    public static String serializeExpressionToJson(Expression expression) {
+        Map<String, Object> expressionObject = visitExpression(expression);
+        try {
+            return OBJECT_MAPPER.writeValueAsString(expressionObject);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Optional<Predicate> deserializeExpressionFromJson(String jsonExpression) {
+        try {
+            if (jsonExpression == null) {
+                return Optional.empty();
+            }
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonExpression);
+            return Optional.of((Predicate) visitExpression((ObjectNode) jsonNode));
+        } catch (JsonProcessingException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public static Map<String, Object> visitPredicate(Predicate predicate) {
+        Map<String, Object> predicateObject = new HashMap<>();
+        predicateObject.put("type", "predicate");
+        predicateObject.put("name", predicate.getName());
+        predicateObject.put("left", visitExpression(predicate.getChildren().get(0)));
+        predicateObject.put("right", visitExpression(predicate.getChildren().get(1)));
+        return predicateObject;
+    }
+
+    public static Map<String, Object> visitLiteral(Literal literal) {
+        Map<String, Object> literalObject = new HashMap<>();
+        literalObject.put("type", "literal");
+        literalObject.put("dataType", literal.getDataType().toString());
+        literalObject.put("value", literal.getValue());
+        return literalObject;
+    }
+
+    public static Map<String, Object> visitColumn(Column column) {
+        Map<String, Object> columnObject = new HashMap<>();
+        columnObject.put("type", "column");
+        columnObject.put("names", column.getNames());
+        return columnObject;
+    }
+
+    private static Map<String, Object> visitExpression(Expression expression) {
+        return switch (expression) {
+            case Predicate predicate -> visitPredicate(predicate);
+            case Column column -> visitColumn(column);
+            case Literal literal -> visitLiteral(literal);
+            case null, default -> throw new UnsupportedOperationException("Unsupported expression type: " + expression);
+        };
+    }
+
+    public static Predicate visitPredicate(ObjectNode node) {
+        return new Predicate(node.get("name").asText(), visitExpression((ObjectNode) node.get("left")),
+                visitExpression((ObjectNode) node.get("right")));
+    }
+
+    public static Literal visitLiteral(ObjectNode node) {
+        switch (node.get("dataType").asText()) {
+            case "boolean" : return Literal.ofBoolean(node.get("value").asBoolean());
+            case "byte" : return Literal.ofByte((byte) node.get("value").asInt());
+            case "short" : return Literal.ofShort(node.get("value").shortValue());
+            case "integer" : return Literal.ofInt(node.get("value").asInt());
+            case "long" : return Literal.ofLong(node.get("value").asLong());
+            case "float" : return Literal.ofFloat(node.get("value").floatValue());
+            case "double" : return Literal.ofDouble(node.get("value").doubleValue());
+            case "date" : return Literal.ofDate(node.get("value").asInt());
+            case "timestamp" : return Literal.ofTimestamp(node.get("value").asLong());
+            case "string" : return Literal.ofString(node.get("value").asText());
+            case null, default : throw new UnsupportedOperationException("Unsupported literal type: " + node.get("dataType").asText());
+        }
+    }
+
+    public static Column visitColumn(ObjectNode node) {
+        if (node.get("names").isArray()) {
+            return new Column(StreamSupport.stream(node.get("names").spliterator(), false).map(JsonNode::asText)
+                    .toArray(String[]::new));
+        } else {
+            return new Column(node.get("names").asText());
+        }
+    }
+
+    private static Expression visitExpression(ObjectNode node) {
+        return switch (node.get("type").asText()) {
+            case "predicate" -> visitPredicate(node);
+            case "column" -> visitColumn(node);
+            case "literal" -> visitLiteral(node);
+            case null, default -> throw new UnsupportedOperationException("Unsupported expression type: " + node.get("type").asText());
+        };
+    }
+}