[ASTERIXDB-3503][EXT] Add column filter for Delta Reader.

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Reading only the required columns from the delta table.

Ext-ref: MB-63840

Change-Id: I809c692777349025d5ce0435c3a6068d432cd282
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
new file mode 100644
index 0000000..2c86132
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+/**
+ * This visitor clips the filesSchema stored in Delta table metadata using the expected type
+ */
+public class AsterixTypeToDeltaTypeVisitor implements IATypeVisitor<DataType, DataType> {
+
+    private final DeltaConverterContext context;
+    private Map<String, FunctionCallInformation> funcInfo;
+
+    public AsterixTypeToDeltaTypeVisitor(DeltaConverterContext context) {
+        this.context = context;
+    }
+
+    public StructType clipType(ARecordType rootType, StructType fileSchema,
+            Map<String, FunctionCallInformation> funcInfo) {
+        if (rootType == EMPTY_TYPE) {
+            return new StructType();
+        } else if (rootType == ALL_FIELDS_TYPE) {
+            return fileSchema;
+        }
+        StructType builder = new StructType();
+        this.funcInfo = funcInfo;
+        return clipObjectChildren(builder, rootType, fileSchema);
+    }
+
+    @Override
+    public DataType visit(ARecordType recordType, DataType arg) {
+        if (isNotCompatibleType(arg, recordType)) {
+            return null;
+        }
+        StructType builder = new StructType();
+        builder = clipObjectChildren(builder, recordType, (StructType) arg);
+        if (builder.fields().size() == 0) {
+            return null;
+        }
+        return builder;
+    }
+
+    @Override
+    public DataType visit(AbstractCollectionType collectionType, DataType arg) {
+        if (isNotCompatibleType(arg, collectionType)) {
+            return null;
+        }
+        DataType elementSchema = ((ArrayType) arg).getElementType();
+        DataType requestedChildType = collectionType.getItemType().accept(this, elementSchema);
+        return new ArrayType(requestedChildType, true);
+    }
+
+    private StructType clipObjectChildren(StructType builder, ARecordType recordType, StructType arg) {
+        String[] fieldNames = recordType.getFieldNames();
+        IAType[] fieldTypes = recordType.getFieldTypes();
+        for (int i = 0; i < fieldNames.length; i++) {
+            // If the field is not present in the file schema, we skip it
+            if (arg.fieldNames().contains(fieldNames[i])) {
+                DataType type = arg.get(fieldNames[i]).getDataType();
+                DataType childType = fieldTypes[i].accept(this, type);
+                if (childType != null) {
+                    // We only add non-MISSING children
+                    builder = builder.add(fieldNames[i], childType);
+                }
+            }
+        }
+        return builder;
+    }
+
+    @Override
+    public DataType visit(AUnionType unionType, DataType arg) {
+        if (arg instanceof ArrayType) {
+            return unionType.getType(ATypeTag.ARRAY).accept(this, arg);
+        } else {
+            return unionType.getType(ATypeTag.OBJECT).accept(this, arg);
+        }
+    }
+
+    @Override
+    public DataType visitFlat(IAType node, DataType arg) {
+        return arg;
+    }
+
+    private boolean isNotCompatibleType(DataType type, IAType node) {
+        // typeName is unique
+        FunctionCallInformation info = funcInfo.get(node.getTypeName());
+        ATypeTag actualType = null;
+        try {
+            actualType = DeltaDataParser.getTypeTag(type, false, context);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+        ATypeTag expectedType = node.getTypeTag();
+
+        boolean isNotExpected = actualType != expectedType;
+        if (isNotExpected) {
+            //If no warning is created, then it means it has been reported
+            Warning warning = null;
+            if (actualType != ATypeTag.SYSTEM_NULL) {
+                warning = info.createWarning(expectedType, actualType);
+            }
+            if (warning != null) {
+                //New warning that we saw for the first time. We should report it.
+                context.getWarnings().add(warning);
+            }
+        }
+        return isNotExpected;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..5ce2c78 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -20,6 +20,7 @@
 
 import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,8 +35,12 @@
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -43,6 +48,7 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,6 +58,7 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
 
 public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
@@ -87,6 +94,10 @@
         if (serviceEndpoint != null) {
             conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
         }
+        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+                configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+        conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+                configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
         String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                 + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -96,7 +107,21 @@
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
         Snapshot snapshot = table.getLatestSnapshot(engine);
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+
+        List<Warning> warnings = new ArrayList<>();
+        DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
+        AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
+        StructType requiredSchema;
+        try {
+            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+            Map<String, FunctionCallInformation> functionCallInformationMap =
+                    HDFSUtils.getFunctionCallInformationMap(conf);
+            StructType fileSchema = snapshot.getSchema(engine);
+            requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
 
@@ -112,6 +137,18 @@
         locationConstraints = configureLocationConstraints(appCtx);
         configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
         distributeFiles();
+        issueWarnings(warnings, warningCollector);
+    }
+
+    private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
+        if (!warnings.isEmpty() && warningCollector.shouldWarn()) {
+            for (Warning warning : warnings) {
+                if (warningCollector.shouldWarn()) {
+                    warningCollector.warn(warning);
+                }
+            }
+        }
+        warnings.clear();
     }
 
     private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
index 81e465c..a404e2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.aws.delta.converter;
 
 import java.io.DataOutput;
+import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
@@ -32,6 +33,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
 
 public class DeltaConverterContext extends ParserContext {
     @SuppressWarnings("unchecked")
@@ -47,8 +49,10 @@
     private final int timeZoneOffset;
     private final AMutableDate mutableDate = new AMutableDate(0);
     private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+    private final List<Warning> warnings;
 
-    public DeltaConverterContext(Map<String, String> configuration) {
+    public DeltaConverterContext(Map<String, String> configuration, List<Warning> warnings) {
+        this.warnings = warnings;
         decimalToDouble = Boolean.parseBoolean(configuration
                 .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
         timestampAsLong = Boolean.parseBoolean(configuration
@@ -96,4 +100,8 @@
     public boolean isDateAsInt() {
         return dateAsInt;
     }
+
+    public List<Warning> getWarnings() {
+        return warnings;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index ea02d77..adb846d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -23,6 +23,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -42,6 +44,7 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IValueReference;
 
@@ -68,7 +71,8 @@
     private final IExternalFilterValueEmbedder valueEmbedder;
 
     public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
-        parserContext = new DeltaConverterContext(conf);
+        List<Warning> warnings = new ArrayList<>();
+        parserContext = new DeltaConverterContext(conf, warnings);
         valueEmbedder = context.getValueEmbedder();
     }
 
@@ -91,7 +95,7 @@
         for (int i = 0; i < schema.fields().size(); i++) {
             DataType fieldSchema = schema.fields().get(i).getDataType();
             String fieldName = schema.fieldNames().get(i);
-            ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+            ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i), parserContext);
             IValueReference value = null;
             if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
                 value = valueEmbedder.getEmbeddedValue();
@@ -120,7 +124,7 @@
         for (int i = 0; i < schema.fields().size(); i++) {
             DataType fieldSchema = schema.fields().get(i).getDataType();
             String fieldName = schema.fieldNames().get(i);
-            ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+            ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index), parserContext);
             IValueReference value = null;
             if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
                 value = valueEmbedder.getEmbeddedValue();
@@ -157,7 +161,8 @@
         parserContext.exitCollection(valueBuffer, arrayBuilder);
     }
 
-    private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+    public static ATypeTag getTypeTag(DataType schema, boolean isNull, DeltaConverterContext parserContext)
+            throws HyracksDataException {
         if (isNull) {
             return ATypeTag.NULL;
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index bc5b8c3..50e72ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -922,7 +922,7 @@
 
     public static boolean supportsPushdown(Map<String, String> properties) {
         //Currently, only Apache Parquet format is supported
-        return isParquetFormat(properties);
+        return isParquetFormat(properties) || isDeltaTable(properties);
     }
 
     /**