[ASTERIXDB-3503][EXT] Add column filter for Delta Reader.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Reading only the required columns from the delta table.
Ext-ref: MB-63840
Change-Id: I809c692777349025d5ce0435c3a6068d432cd282
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
new file mode 100644
index 0000000..2c86132
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+/**
+ * This visitor clips the filesSchema stored in Delta table metadata using the expected type
+ */
+public class AsterixTypeToDeltaTypeVisitor implements IATypeVisitor<DataType, DataType> {
+
+ private final DeltaConverterContext context;
+ private Map<String, FunctionCallInformation> funcInfo;
+
+ public AsterixTypeToDeltaTypeVisitor(DeltaConverterContext context) {
+ this.context = context;
+ }
+
+ public StructType clipType(ARecordType rootType, StructType fileSchema,
+ Map<String, FunctionCallInformation> funcInfo) {
+ if (rootType == EMPTY_TYPE) {
+ return new StructType();
+ } else if (rootType == ALL_FIELDS_TYPE) {
+ return fileSchema;
+ }
+ StructType builder = new StructType();
+ this.funcInfo = funcInfo;
+ return clipObjectChildren(builder, rootType, fileSchema);
+ }
+
+ @Override
+ public DataType visit(ARecordType recordType, DataType arg) {
+ if (isNotCompatibleType(arg, recordType)) {
+ return null;
+ }
+ StructType builder = new StructType();
+ builder = clipObjectChildren(builder, recordType, (StructType) arg);
+ if (builder.fields().size() == 0) {
+ return null;
+ }
+ return builder;
+ }
+
+ @Override
+ public DataType visit(AbstractCollectionType collectionType, DataType arg) {
+ if (isNotCompatibleType(arg, collectionType)) {
+ return null;
+ }
+ DataType elementSchema = ((ArrayType) arg).getElementType();
+ DataType requestedChildType = collectionType.getItemType().accept(this, elementSchema);
+ return new ArrayType(requestedChildType, true);
+ }
+
+ private StructType clipObjectChildren(StructType builder, ARecordType recordType, StructType arg) {
+ String[] fieldNames = recordType.getFieldNames();
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ for (int i = 0; i < fieldNames.length; i++) {
+ // If the field is not present in the file schema, we skip it
+ if (arg.fieldNames().contains(fieldNames[i])) {
+ DataType type = arg.get(fieldNames[i]).getDataType();
+ DataType childType = fieldTypes[i].accept(this, type);
+ if (childType != null) {
+ // We only add non-MISSING children
+ builder = builder.add(fieldNames[i], childType);
+ }
+ }
+ }
+ return builder;
+ }
+
+ @Override
+ public DataType visit(AUnionType unionType, DataType arg) {
+ if (arg instanceof ArrayType) {
+ return unionType.getType(ATypeTag.ARRAY).accept(this, arg);
+ } else {
+ return unionType.getType(ATypeTag.OBJECT).accept(this, arg);
+ }
+ }
+
+ @Override
+ public DataType visitFlat(IAType node, DataType arg) {
+ return arg;
+ }
+
+ private boolean isNotCompatibleType(DataType type, IAType node) {
+ // typeName is unique
+ FunctionCallInformation info = funcInfo.get(node.getTypeName());
+ ATypeTag actualType = null;
+ try {
+ actualType = DeltaDataParser.getTypeTag(type, false, context);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ ATypeTag expectedType = node.getTypeTag();
+
+ boolean isNotExpected = actualType != expectedType;
+ if (isNotExpected) {
+ //If no warning is created, then it means it has been reported
+ Warning warning = null;
+ if (actualType != ATypeTag.SYSTEM_NULL) {
+ warning = info.createWarning(expectedType, actualType);
+ }
+ if (warning != null) {
+ //New warning that we saw for the first time. We should report it.
+ context.getWarnings().add(warning);
+ }
+ }
+ return isNotExpected;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..5ce2c78 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -20,6 +20,7 @@
import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -34,8 +35,12 @@
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -43,6 +48,7 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,6 +58,7 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
@@ -87,6 +94,10 @@
if (serviceEndpoint != null) {
conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
}
+ conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+ configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+ conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+ configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+ configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -96,7 +107,21 @@
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
Snapshot snapshot = table.getLatestSnapshot(engine);
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+
+ List<Warning> warnings = new ArrayList<>();
+ DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
+ AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
+ StructType requiredSchema;
+ try {
+ ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+ Map<String, FunctionCallInformation> functionCallInformationMap =
+ HDFSUtils.getFunctionCallInformationMap(conf);
+ StructType fileSchema = snapshot.getSchema(engine);
+ requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
@@ -112,6 +137,18 @@
locationConstraints = configureLocationConstraints(appCtx);
configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
distributeFiles();
+ issueWarnings(warnings, warningCollector);
+ }
+
+ private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
+ if (!warnings.isEmpty() && warningCollector.shouldWarn()) {
+ for (Warning warning : warnings) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(warning);
+ }
+ }
+ }
+ warnings.clear();
}
private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
index 81e465c..a404e2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.aws.delta.converter;
import java.io.DataOutput;
+import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -32,6 +33,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
public class DeltaConverterContext extends ParserContext {
@SuppressWarnings("unchecked")
@@ -47,8 +49,10 @@
private final int timeZoneOffset;
private final AMutableDate mutableDate = new AMutableDate(0);
private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final List<Warning> warnings;
- public DeltaConverterContext(Map<String, String> configuration) {
+ public DeltaConverterContext(Map<String, String> configuration, List<Warning> warnings) {
+ this.warnings = warnings;
decimalToDouble = Boolean.parseBoolean(configuration
.getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
timestampAsLong = Boolean.parseBoolean(configuration
@@ -96,4 +100,8 @@
public boolean isDateAsInt() {
return dateAsInt;
}
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index ea02d77..adb846d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -23,6 +23,8 @@
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,7 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.avro.AvroRuntimeException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -68,7 +71,8 @@
private final IExternalFilterValueEmbedder valueEmbedder;
public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
- parserContext = new DeltaConverterContext(conf);
+ List<Warning> warnings = new ArrayList<>();
+ parserContext = new DeltaConverterContext(conf, warnings);
valueEmbedder = context.getValueEmbedder();
}
@@ -91,7 +95,7 @@
for (int i = 0; i < schema.fields().size(); i++) {
DataType fieldSchema = schema.fields().get(i).getDataType();
String fieldName = schema.fieldNames().get(i);
- ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+ ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i), parserContext);
IValueReference value = null;
if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
value = valueEmbedder.getEmbeddedValue();
@@ -120,7 +124,7 @@
for (int i = 0; i < schema.fields().size(); i++) {
DataType fieldSchema = schema.fields().get(i).getDataType();
String fieldName = schema.fieldNames().get(i);
- ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+ ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index), parserContext);
IValueReference value = null;
if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
value = valueEmbedder.getEmbeddedValue();
@@ -157,7 +161,8 @@
parserContext.exitCollection(valueBuffer, arrayBuilder);
}
- private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+ public static ATypeTag getTypeTag(DataType schema, boolean isNull, DeltaConverterContext parserContext)
+ throws HyracksDataException {
if (isNull) {
return ATypeTag.NULL;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index bc5b8c3..50e72ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -922,7 +922,7 @@
public static boolean supportsPushdown(Map<String, String> properties) {
//Currently, only Apache Parquet format is supported
- return isParquetFormat(properties);
+ return isParquetFormat(properties) || isDeltaTable(properties);
}
/**