[ASTERIXDB-3503][EXT] Use delta kernel api instead of delta standalone api
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Delta standalone api does not support reading delta tables created using newer
delta table write protocols. Furthermore, the current implementation does not
support other delta table feautres like column mapping and deletion vectors.
With this change we move to delta kernel api to read delta tables.
Also, to support column mapping and deletion vector features, instead of just
collecting the set of parquet files to read and then reading them through
our own parquet reader implementation, we use the delta kernel api itself
to read the parquet files.
Ext-ref: MB-63840
Change-Id: I0017e63ac0bddcfa0b342d9380d55934a76c12ec
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19047
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index f041fa2..1236636 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -196,7 +196,7 @@
Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
.schema(new StructType().add(new StructField("id", new IntegerType(), true))
.add(new StructField("name", new StringType(), true))
- .add(new StructField("age", new IntegerType(), true)))
+ .add(new StructField("age", new StringType(), true)))
.build();
txn.updateMetadata(metaData);
txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
index 2e77beb..63a83d1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
@@ -30,6 +30,6 @@
(
%template%,
("container"="playground"),
- ("definition"="my-table-empty"),
+ ("definition"="delta-data/empty_delta_table"),
("table-format" = "delta")
);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index fdacc68..771929b 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -575,6 +575,22 @@
</dependency>
<dependency>
<groupId>io.delta</groupId>
+ <artifactId>delta-kernel-api</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-kernel-defaults</artifactId>
+ <version>3.2.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index f5a2cd5..9909cc3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,35 +18,70 @@
*/
package org.apache.asterix.external.input.record.reader.aws.delta;
-import static org.apache.asterix.external.util.ExternalDataUtils.prepareDeltaTableFormat;
import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
-public class AwsS3DeltaReaderFactory extends AwsS3ParquetReaderFactory {
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
private static final long serialVersionUID = 1L;
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+ private static final Logger LOGGER = LogManager.getLogger();
+ private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
+ private Map<Integer, List<String>> schedule;
+ private String scanState;
+ private Map<String, String> configuration;
+ private List<String> scanFiles;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return locationConstraints;
+ }
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
-
+ this.configuration = configuration;
Configuration conf = new Configuration();
conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+ conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+ }
conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
if (serviceEndpoint != null) {
@@ -56,8 +91,77 @@
+ configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
- prepareDeltaTableFormat(configuration, conf, tableMetadataPath);
- super.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory);
+ ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+
+ Engine engine = DefaultEngine.create(conf);
+ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+ Snapshot snapshot = table.getLatestSnapshot(engine);
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+ scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+ CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
+
+ scanFiles = new ArrayList<>();
+ while (iter.hasNext()) {
+ FilteredColumnarBatch batch = iter.next();
+ CloseableIterator<Row> rowIter = batch.getRows();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ scanFiles.add(RowSerDe.serializeRowToJson(row));
+ }
+ }
+ locationConstraints = configureLocationConstraints(appCtx);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+ distributeFiles();
+ }
+
+ private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+
+ String[] locations = csm.getClusterLocations().getLocations();
+ if (scanFiles.size() == 0) {
+ return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+ } else if (locations.length > scanFiles.size()) {
+ LOGGER.debug(
+ "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count",
+ locations.length, scanFiles.size());
+ final String[] locationCopy = locations.clone();
+ ArrayUtils.shuffle(locationCopy);
+ locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations);
+ }
+
+ private void distributeFiles() {
+ final int numComputePartitions = getPartitionConstraint().getLocations().length;
+ schedule = new HashMap<>();
+ for (int i = 0; i < numComputePartitions; i++) {
+ schedule.put(i, new ArrayList<>());
+ }
+ int i = 0;
+ for (String scanFile : scanFiles) {
+ schedule.get(i).add(scanFile);
+ i = (i + 1) % numComputePartitions;
+ }
+ }
+
+ @Override
+ public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+ try {
+ int partition = context.getPartition();
+ return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() throws AsterixException {
+ return Row.class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java
new file mode 100644
index 0000000..13c3d0c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java
@@ -0,0 +1,511 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BasePrimitiveType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.FieldMetadata;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+/**
+ * Serialize and deserialize Delta data types {@link DataType} to JSON and from JSON class based on
+ * the <a
+ * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types">serialization
+ * rules </a> outlined in the Delta Protocol.
+ */
+public class DataTypeJsonSerDe {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .registerModule(new SimpleModule().addSerializer(StructType.class, new StructTypeSerializer()));
+
+ private DataTypeJsonSerDe() {
+ }
+
+ /**
+ * Serializes a {@link DataType} to a JSON string according to the Delta Protocol. TODO: Only
+ * reason why this API added was due to Flink-Kernel dependency. Currently Flink-Kernel uses the
+ * Kernel DataType.toJson and Standalone DataType.fromJson to convert between types.
+ *
+ * @param dataType
+ * @return JSON string representing the data type
+ */
+ public static String serializeDataType(DataType dataType) {
+ try {
+ StringWriter stringWriter = new StringWriter();
+ JsonGenerator generator = OBJECT_MAPPER.createGenerator(stringWriter);
+ writeDataType(generator, dataType);
+ generator.flush();
+ return stringWriter.toString();
+ } catch (IOException ex) {
+ throw new KernelException("Could not serialize DataType to JSON", ex);
+ }
+ }
+
+ /**
+ * Deserializes a JSON string representing a Delta data type to a {@link DataType}.
+ *
+ * @param structTypeJson JSON string representing a {@link StructType} data type
+ */
+ public static StructType deserializeStructType(String structTypeJson) {
+ try {
+ DataType parsedType = parseDataType(OBJECT_MAPPER.reader().readTree(structTypeJson), "" /* fieldPath */,
+ new FieldMetadata.Builder().build() /* collationsMetadata */);
+ if (parsedType instanceof StructType) {
+ return (StructType) parsedType;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Could not parse the following JSON as a valid StructType:\n%s", structTypeJson));
+ }
+ } catch (JsonProcessingException ex) {
+ throw new KernelException(format("Could not parse schema given as JSON string: %s", structTypeJson), ex);
+ }
+ }
+
+ /**
+ * Parses a Delta data type from JSON. Data types can either be serialized as strings (for
+ * primitive types) or as objects (for complex types).
+ *
+ * <p>For example:
+ *
+ * <pre>
+ * // Map type field is serialized as:
+ * {
+ * "name" : "f",
+ * "type" : {
+ * "type" : "map",
+ * "keyType" : "string",
+ * "valueType" : "string",
+ * "valueContainsNull" : true
+ * },
+ * "nullable" : true,
+ * "metadata" : { }
+ * }
+ *
+ * // Integer type field serialized as:
+ * {
+ * "name" : "a",
+ * "type" : "integer",
+ * "nullable" : false,
+ * "metadata" : { }
+ * }
+ *
+ * // Collated string type field serialized as:
+ * {
+ * "name" : "s",
+ * "type" : "string",
+ * "nullable", false,
+ * "metadata" : {
+ * "__COLLATIONS": { "s": "ICU.de_DE" }
+ * }
+ * }
+ *
+ * // Array with collated strings field serialized as:
+ * {
+ * "name" : "arr",
+ * "type" : {
+ * "type" : "array",
+ * "elementType" : "string",
+ * "containsNull" : false
+ * }
+ * "nullable" : false,
+ * "metadata" : {
+ * "__COLLATIONS": { "arr.element": "ICU.de_DE" }
+ * }
+ * }
+ * </pre>
+ *
+ * @param fieldPath Path from the nearest ancestor that is of the {@link StructField} type. For
+ * example, "c1.key.element" represents a path starting from the {@link StructField} named
+ * "c1." The next element, "key," indicates that "c1" stores a {@link MapType} type. The final
+ * element, "element", shows that the key of the map is an {@link ArrayType} type.
+ * @param collationsMetadata Metadata that maps the path of a {@link StringType} to its collation.
+ * Only maps non-UTF8_BINARY collated {@link StringType}. Collation metadata is stored in the
+ * nearest ancestor, which is the StructField. This is because StructField includes a metadata
+ * field, whereas Map and Array do not, making them unable to store this information. Paths
+ * are in same form as `fieldPath`. <a
+ * href="https://github.com/delta-io/delta/blob/master/protocol_rfcs/collated-string-type.md#collation-identifiers">Docs</a>
+ */
+ static DataType parseDataType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+ switch (json.getNodeType()) {
+ case STRING:
+ // simple types are stored as just a string
+ return nameToType(json.textValue(), fieldPath, collationsMetadata);
+ case OBJECT:
+ // complex types (array, map, or struct are stored as JSON objects)
+ String type = getStringField(json, "type");
+ switch (type) {
+ case "struct":
+ assertValidTypeForCollations(fieldPath, "struct", collationsMetadata);
+ return parseStructType(json);
+ case "array":
+ assertValidTypeForCollations(fieldPath, "array", collationsMetadata);
+ return parseArrayType(json, fieldPath, collationsMetadata);
+ case "map":
+ assertValidTypeForCollations(fieldPath, "map", collationsMetadata);
+ return parseMapType(json, fieldPath, collationsMetadata);
+ // No default case here; fall through to the following error when no match
+ }
+ default:
+ throw new IllegalArgumentException(
+ String.format("Could not parse the following JSON as a valid Delta data type:\n%s", json));
+ }
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#array-type">array
+ * type </a>
+ */
+ private static ArrayType parseArrayType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+ checkArgument(json.isObject() && json.size() == 3,
+ "Expected JSON object with 3 fields for array data type but got:\n%s", json);
+ boolean containsNull = getBooleanField(json, "containsNull");
+ DataType dataType =
+ parseDataType(getNonNullField(json, "elementType"), fieldPath + ".element", collationsMetadata);
+ return new ArrayType(dataType, containsNull);
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#map-type">map type
+ * </a>
+ */
+ private static MapType parseMapType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+ checkArgument(json.isObject() && json.size() == 4,
+ "Expected JSON object with 4 fields for map data type but got:\n%s", json);
+ boolean valueContainsNull = getBooleanField(json, "valueContainsNull");
+ DataType keyType = parseDataType(getNonNullField(json, "keyType"), fieldPath + ".key", collationsMetadata);
+ DataType valueType =
+ parseDataType(getNonNullField(json, "valueType"), fieldPath + ".value", collationsMetadata);
+ return new MapType(keyType, valueType, valueContainsNull);
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-type">
+ * struct type </a>
+ */
+ private static StructType parseStructType(JsonNode json) {
+ checkArgument(json.isObject() && json.size() == 2,
+ "Expected JSON object with 2 fields for struct data type but got:\n%s", json);
+ JsonNode fieldsNode = getNonNullField(json, "fields");
+ checkArgument(fieldsNode.isArray(), "Expected array for fieldName=%s in:\n%s", "fields", json);
+ Iterator<JsonNode> fields = fieldsNode.elements();
+ List<StructField> parsedFields = new ArrayList<>();
+ while (fields.hasNext()) {
+ parsedFields.add(parseStructField(fields.next()));
+ }
+ return new StructType(parsedFields);
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-field">
+ * struct field </a>
+ */
+ private static StructField parseStructField(JsonNode json) {
+ checkArgument(json.isObject(), "Expected JSON object for struct field");
+ String name = getStringField(json, "name");
+ FieldMetadata metadata = parseFieldMetadata(json.get("metadata"), false);
+ DataType type = parseDataType(getNonNullField(json, "type"), name, null);
+ boolean nullable = getBooleanField(json, "nullable");
+ return new StructField(name, type, nullable, metadata);
+ }
+
+ /** Parses an {@link FieldMetadata}. */
+ private static FieldMetadata parseFieldMetadata(JsonNode json) {
+ return parseFieldMetadata(json, true);
+ }
+
+ /**
+ * Parses a {@link FieldMetadata}, optionally including collation metadata, depending on
+ * `includecollationsMetadata`.
+ */
+ private static FieldMetadata parseFieldMetadata(JsonNode json, boolean includecollationsMetadata) {
+ if (json == null || json.isNull()) {
+ return FieldMetadata.empty();
+ }
+
+ checkArgument(json.isObject(), "Expected JSON object for struct field metadata");
+ final Iterator<Map.Entry<String, JsonNode>> iterator = json.fields();
+ final FieldMetadata.Builder builder = FieldMetadata.builder();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ JsonNode value = entry.getValue();
+ String key = entry.getKey();
+
+ if (value.isNull()) {
+ builder.putNull(key);
+ } else if (value.isIntegralNumber()) { // covers both int and long
+ builder.putLong(key, value.longValue());
+ } else if (value.isDouble()) {
+ builder.putDouble(key, value.doubleValue());
+ } else if (value.isBoolean()) {
+ builder.putBoolean(key, value.booleanValue());
+ } else if (value.isTextual()) {
+ builder.putString(key, value.textValue());
+ } else if (value.isObject()) {
+ builder.putFieldMetadata(key, parseFieldMetadata(value));
+ } else if (value.isArray()) {
+ final Iterator<JsonNode> fields = value.elements();
+ if (!fields.hasNext()) {
+ // If it is an empty array, we cannot infer its element type.
+ // We put an empty Array[Long].
+ builder.putLongArray(key, new Long[0]);
+ } else {
+ final JsonNode head = fields.next();
+ if (head.isInt()) {
+ builder.putLongArray(key,
+ buildList(value, node -> (long) node.intValue()).toArray(new Long[0]));
+ } else if (head.isDouble()) {
+ builder.putDoubleArray(key, buildList(value, JsonNode::doubleValue).toArray(new Double[0]));
+ } else if (head.isBoolean()) {
+ builder.putBooleanArray(key, buildList(value, JsonNode::booleanValue).toArray(new Boolean[0]));
+ } else if (head.isTextual()) {
+ builder.putStringArray(key, buildList(value, JsonNode::textValue).toArray(new String[0]));
+ } else if (head.isObject()) {
+ builder.putFieldMetadataArray(key,
+ buildList(value, DataTypeJsonSerDe::parseFieldMetadata).toArray(new FieldMetadata[0]));
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported type for Array as field metadata value: %s", value));
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported type for field metadata value: %s", value));
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * For an array JSON node builds a {@link List} using the provided {@code accessor} for each
+ * element.
+ */
+ private static <T> List<T> buildList(JsonNode json, Function<JsonNode, T> accessor) {
+ List<T> result = new ArrayList<>();
+ Iterator<JsonNode> elements = json.elements();
+ while (elements.hasNext()) {
+ result.add(accessor.apply(elements.next()));
+ }
+ return result;
+ }
+
+ private static String FIXED_DECIMAL_REGEX = "decimal\\(\\s*(\\d+)\\s*,\\s*(\\-?\\d+)\\s*\\)";
+ private static Pattern FIXED_DECIMAL_PATTERN = Pattern.compile(FIXED_DECIMAL_REGEX);
+
+ /** Parses primitive string type names to a {@link DataType} */
+ private static DataType nameToType(String name, String fieldPath, FieldMetadata collationsMetadata) {
+ if (BasePrimitiveType.isPrimitiveType(name)) {
+ return BasePrimitiveType.createPrimitive(name);
+ } else if (name.equals("decimal")) {
+ return DecimalType.USER_DEFAULT;
+ } else if ("void".equalsIgnoreCase(name)) {
+ // Earlier versions of Delta had VOID type which is not specified in Delta Protocol.
+ // It is not readable or writable. Throw a user-friendly error message.
+ throw new IllegalArgumentException(String.format("%s is not a supported delta data type", name));
+ } else {
+ // decimal has a special pattern with a precision and scale
+ Matcher decimalMatcher = FIXED_DECIMAL_PATTERN.matcher(name);
+ if (decimalMatcher.matches()) {
+ int precision = Integer.parseInt(decimalMatcher.group(1));
+ int scale = Integer.parseInt(decimalMatcher.group(2));
+ return new DecimalType(precision, scale);
+ }
+
+ // We have encountered a type that is beyond the specification of the protocol
+ // checks. This must be an invalid type (according to protocol) and
+ // not an unsupported data type by Kernel.
+ throw new IllegalArgumentException(String.format("%s is not a supported delta data type", name));
+ }
+ }
+
+ private static JsonNode getNonNullField(JsonNode rootNode, String fieldName) {
+ JsonNode node = rootNode.get(fieldName);
+ if (node == null || node.isNull()) {
+ throw new IllegalArgumentException(
+ String.format("Expected non-null for fieldName=%s in:\n%s", fieldName, rootNode));
+ }
+ return node;
+ }
+
+ private static String getStringField(JsonNode rootNode, String fieldName) {
+ JsonNode node = getNonNullField(rootNode, fieldName);
+ checkArgument(node.isTextual(), "Expected string for fieldName=%s in:\n%s", fieldName, rootNode);
+ return node.textValue(); // double check this only works for string values! and isTextual()!
+ }
+
+ private static void assertValidTypeForCollations(String fieldPath, String fieldType,
+ FieldMetadata collationsMetadata) {
+
+ }
+
+ /** Returns a metadata with a map of field path to collation name. */
+
+ private static boolean getBooleanField(JsonNode rootNode, String fieldName) {
+ JsonNode node = getNonNullField(rootNode, fieldName);
+ checkArgument(node.isBoolean(), "Expected boolean for fieldName=%s in:\n%s", fieldName, rootNode);
+ return node.booleanValue();
+ }
+
+ protected static class StructTypeSerializer extends StdSerializer<StructType> {
+ private static final long serialVersionUID = 1L;
+
+ public StructTypeSerializer() {
+ super(StructType.class);
+ }
+
+ @Override
+ public void serialize(StructType structType, JsonGenerator gen, SerializerProvider provider)
+ throws IOException {
+ writeDataType(gen, structType);
+ }
+ }
+
+ private static void writeDataType(JsonGenerator gen, DataType dataType) throws IOException {
+ if (dataType instanceof StructType) {
+ writeStructType(gen, (StructType) dataType);
+ } else if (dataType instanceof ArrayType) {
+ writeArrayType(gen, (ArrayType) dataType);
+ } else if (dataType instanceof MapType) {
+ writeMapType(gen, (MapType) dataType);
+ } else if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dataType;
+ gen.writeString(format("decimal(%d,%d)", decimalType.getPrecision(), decimalType.getScale()));
+ } else {
+ gen.writeString(dataType.toString());
+ }
+ }
+
+ private static void writeArrayType(JsonGenerator gen, ArrayType arrayType) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("type", "array");
+ gen.writeFieldName("elementType");
+ writeDataType(gen, arrayType.getElementType());
+ gen.writeBooleanField("containsNull", arrayType.containsNull());
+ gen.writeEndObject();
+ }
+
+ private static void writeMapType(JsonGenerator gen, MapType mapType) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("type", "map");
+ gen.writeFieldName("keyType");
+ writeDataType(gen, mapType.getKeyType());
+ gen.writeFieldName("valueType");
+ writeDataType(gen, mapType.getValueType());
+ gen.writeBooleanField("valueContainsNull", mapType.isValueContainsNull());
+ gen.writeEndObject();
+ }
+
+ private static void writeStructType(JsonGenerator gen, StructType structType) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("type", "struct");
+ gen.writeArrayFieldStart("fields");
+ for (StructField field : structType.fields()) {
+ writeStructField(gen, field);
+ }
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
+
+ private static void writeStructField(JsonGenerator gen, StructField field) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("name", field.getName());
+ gen.writeFieldName("type");
+ writeDataType(gen, field.getDataType());
+ gen.writeBooleanField("nullable", field.isNullable());
+ gen.writeFieldName("metadata");
+ writeFieldMetadata(gen, field.getMetadata());
+ gen.writeEndObject();
+ }
+
+ private static void writeFieldMetadata(JsonGenerator gen, FieldMetadata metadata) throws IOException {
+ gen.writeStartObject();
+ for (Map.Entry<String, Object> entry : metadata.getEntries().entrySet()) {
+ gen.writeFieldName(entry.getKey());
+ Object value = entry.getValue();
+ if (value instanceof Long) {
+ gen.writeNumber((Long) value);
+ } else if (value instanceof Double) {
+ gen.writeNumber((Double) value);
+ } else if (value instanceof Boolean) {
+ gen.writeBoolean((Boolean) value);
+ } else if (value instanceof String) {
+ gen.writeString((String) value);
+ } else if (value instanceof FieldMetadata) {
+ writeFieldMetadata(gen, (FieldMetadata) value);
+ } else if (value instanceof Long[]) {
+ gen.writeStartArray();
+ for (Long v : (Long[]) value) {
+ gen.writeNumber(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof Double[]) {
+ gen.writeStartArray();
+ for (Double v : (Double[]) value) {
+ gen.writeNumber(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof Boolean[]) {
+ gen.writeStartArray();
+ for (Boolean v : (Boolean[]) value) {
+ gen.writeBoolean(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof String[]) {
+ gen.writeStartArray();
+ for (String v : (String[]) value) {
+ gen.writeString(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof FieldMetadata[]) {
+ gen.writeStartArray();
+ for (FieldMetadata v : (FieldMetadata[]) value) {
+ writeFieldMetadata(gen, v);
+ }
+ gen.writeEndArray();
+ } else if (value == null) {
+ gen.writeNull();
+ } else {
+ throw new IllegalArgumentException(format("Unsupported type for field metadata value: %s", value));
+ }
+ }
+ gen.writeEndObject();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
new file mode 100644
index 0000000..558f8a9
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+/**
+ * Delta record reader.
+ * The reader returns records in Delta Kernel Row format.
+ */
+public class DeltaFileRecordReader implements IRecordReader<Row> {
+
+ private Engine engine;
+ private List<Row> scanFiles;
+ private Row scanState;
+ protected IRawRecord<Row> record;
+ protected VoidPointable value = null;
+ private FileStatus fileStatus;
+ private StructType physicalReadSchema;
+ private CloseableIterator<ColumnarBatch> physicalDataIter;
+ private CloseableIterator<FilteredColumnarBatch> dataIter;
+ private int fileIndex;
+ private Row scanFile;
+ private CloseableIterator<Row> rows;
+
+ public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, Map<String, String> conf,
+ IExternalDataRuntimeContext context) {
+ Configuration config = new Configuration();
+ config.set(S3Constants.HADOOP_ACCESS_KEY_ID, conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+ config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+ config.set(S3Constants.HADOOP_SESSION_TOKEN, conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+ }
+ config.set(S3Constants.HADOOP_REGION, conf.get(S3Constants.REGION_FIELD_NAME));
+ String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
+ if (serviceEndpoint != null) {
+ config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ }
+ this.engine = DefaultEngine.create(config);
+ this.scanFiles = new ArrayList<>();
+ for (String scanFile : serScanFiles) {
+ this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
+ }
+ this.scanState = RowSerDe.deserializeRowFromJson(serScanState);
+ this.fileStatus = null;
+ this.physicalReadSchema = null;
+ this.physicalDataIter = null;
+ this.dataIter = null;
+ this.record = new GenericRecord<>();
+ if (scanFiles.size() > 0) {
+ this.fileIndex = 0;
+ this.scanFile = scanFiles.get(0);
+ this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+ this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+ try {
+ this.physicalDataIter = engine.getParquetHandler()
+ .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty());
+ this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ if (dataIter.hasNext()) {
+ rows = dataIter.next().getRows();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (dataIter != null) {
+ dataIter.close();
+ }
+ if (physicalDataIter != null) {
+ physicalDataIter.close();
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (rows != null && rows.hasNext()) {
+ return true;
+ } else if (dataIter != null && dataIter.hasNext()) {
+ rows = dataIter.next().getRows();
+ return this.hasNext();
+ } else if (fileIndex < scanFiles.size() - 1) {
+ fileIndex++;
+ scanFile = scanFiles.get(fileIndex);
+ fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+ physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+ physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+ physicalReadSchema, Optional.empty());
+ dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ return this.hasNext();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public IRawRecord<Row> next() throws IOException, InterruptedException {
+ Row row = rows.next();
+ record.set(row);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+
+ }
+
+ @Override
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java
new file mode 100644
index 0000000..d91cd9a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ */
+public class RowSerDe {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private RowSerDe() {
+ }
+
+ /**
+ * Utility method to serialize a {@link Row} as a JSON string
+ */
+ public static String serializeRowToJson(Row row) {
+ Map<String, Object> rowObject = convertRowToJsonObject(row);
+ try {
+ Map<String, Object> rowWithSchema = new HashMap<>();
+ rowWithSchema.put("schema", row.getSchema().toJson());
+ rowWithSchema.put("row", rowObject);
+ return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+ } catch (JsonProcessingException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Utility method to deserialize a {@link Row} object from the JSON form.
+ */
+ public static Row deserializeRowFromJson(String jsonRowWithSchema) {
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+ JsonNode schemaNode = jsonNode.get("schema");
+ StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaNode.asText());
+ return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
+ } catch (JsonProcessingException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ private static Map<String, Object> convertRowToJsonObject(Row row) {
+ StructType rowType = row.getSchema();
+ Map<String, Object> rowObject = new HashMap<>();
+ for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+ StructField field = rowType.at(fieldId);
+ DataType fieldType = field.getDataType();
+ String name = field.getName();
+
+ if (row.isNullAt(fieldId)) {
+ rowObject.put(name, null);
+ continue;
+ }
+
+ Object value;
+ if (fieldType instanceof BooleanType) {
+ value = row.getBoolean(fieldId);
+ } else if (fieldType instanceof ByteType) {
+ value = row.getByte(fieldId);
+ } else if (fieldType instanceof ShortType) {
+ value = row.getShort(fieldId);
+ } else if (fieldType instanceof IntegerType) {
+ value = row.getInt(fieldId);
+ } else if (fieldType instanceof LongType) {
+ value = row.getLong(fieldId);
+ } else if (fieldType instanceof FloatType) {
+ value = row.getFloat(fieldId);
+ } else if (fieldType instanceof DoubleType) {
+ value = row.getDouble(fieldId);
+ } else if (fieldType instanceof DateType) {
+ value = row.getInt(fieldId);
+ } else if (fieldType instanceof TimestampType) {
+ value = row.getLong(fieldId);
+ } else if (fieldType instanceof StringType) {
+ value = row.getString(fieldId);
+ } else if (fieldType instanceof ArrayType) {
+ value = VectorUtils.toJavaList(row.getArray(fieldId));
+ } else if (fieldType instanceof MapType) {
+ value = VectorUtils.toJavaMap(row.getMap(fieldId));
+ } else if (fieldType instanceof StructType) {
+ Row subRow = row.getStruct(fieldId);
+ value = convertRowToJsonObject(subRow);
+ } else {
+ throw new UnsupportedOperationException("NYI");
+ }
+
+ rowObject.put(name, value);
+ }
+
+ return rowObject;
+ }
+
+ private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, StructType rowType) {
+ return new DefaultJsonRow(rowJsonNode, rowType);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
new file mode 100644
index 0000000..e56be86
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+public class DeltaDataParser extends AbstractDataParser implements IRecordDataParser<Row> {
+ private final ParserContext parserContext;
+ private final IExternalFilterValueEmbedder valueEmbedder;
+
+ public DeltaDataParser(IExternalDataRuntimeContext context) {
+ parserContext = new ParserContext();
+ valueEmbedder = context.getValueEmbedder();
+ }
+
+ @Override
+ public boolean parse(IRawRecord<? extends Row> record, DataOutput out) throws HyracksDataException {
+ try {
+ parseObject(record.get(), out);
+ valueEmbedder.reset();
+ return true;
+ } catch (AvroRuntimeException | IOException e) {
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
+ }
+
+ private void parseObject(Row record, DataOutput out) throws IOException {
+ IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ StructType schema = record.getSchema();
+ valueEmbedder.enterObject();
+ for (int i = 0; i < schema.fields().size(); i++) {
+ DataType fieldSchema = schema.fields().get(i).getDataType();
+ String fieldName = schema.fieldNames().get(i);
+ ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+ IValueReference value = null;
+ if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+ value = valueEmbedder.getEmbeddedValue();
+ } else {
+ valueBuffer.reset();
+ parseValue(fieldSchema, record, i, valueBuffer.getDataOutput());
+ value = valueBuffer;
+ }
+
+ if (value != null) {
+ // Ignore missing values
+ objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+ }
+ }
+
+ embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+ objectBuilder.write(out, true);
+ valueEmbedder.exitObject();
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ }
+
+ private void parseObject(StructType schema, ColumnVector column, int index, DataOutput out) throws IOException {
+ IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ valueEmbedder.enterObject();
+ for (int i = 0; i < schema.fields().size(); i++) {
+ DataType fieldSchema = schema.fields().get(i).getDataType();
+ String fieldName = schema.fieldNames().get(i);
+ ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+ IValueReference value = null;
+ if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+ value = valueEmbedder.getEmbeddedValue();
+ } else {
+ valueBuffer.reset();
+ parseValue(fieldSchema, column.getChild(i), index, valueBuffer.getDataOutput());
+ value = valueBuffer;
+ }
+
+ if (value != null) {
+ // Ignore missing values
+ objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+ }
+ }
+
+ embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+ objectBuilder.write(out, true);
+ valueEmbedder.exitObject();
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ }
+
+ private void parseArray(ArrayType arraySchema, ArrayValue arrayValue, DataOutput out) throws IOException {
+ DataType elementSchema = arraySchema.getElementType();
+ ColumnVector elements = arrayValue.getElements();
+ final IMutableValueStorage valueBuffer = parserContext.enterCollection();
+ final IAsterixListBuilder arrayBuilder =
+ parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+ for (int i = 0; i < elements.getSize(); i++) {
+ valueBuffer.reset();
+ parseValue(elementSchema, elements, i, valueBuffer.getDataOutput());
+ arrayBuilder.addItem(valueBuffer);
+ }
+ arrayBuilder.write(out, true);
+ parserContext.exitCollection(valueBuffer, arrayBuilder);
+ }
+
+ private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+ if (isNull) {
+ return ATypeTag.NULL;
+ }
+
+ if (schema instanceof BooleanType) {
+ return ATypeTag.BOOLEAN;
+ } else if (schema instanceof ShortType || schema instanceof IntegerType || schema instanceof LongType) {
+ return ATypeTag.BIGINT;
+ } else if (schema instanceof DoubleType) {
+ return ATypeTag.DOUBLE;
+ } else if (schema instanceof StringType) {
+ return ATypeTag.STRING;
+ } else if (schema instanceof DateType) {
+ return ATypeTag.BIGINT;
+ } else if (schema instanceof TimestampType || schema instanceof TimestampNTZType) {
+ return ATypeTag.BIGINT;
+ } else if (schema instanceof BinaryType) {
+ return ATypeTag.BINARY;
+ } else if (schema instanceof ArrayType) {
+ return ATypeTag.ARRAY;
+ } else if (schema instanceof StructType) {
+ return ATypeTag.OBJECT;
+ } else {
+ throw createUnsupportedException(schema);
+ }
+ }
+
+ private void parseValue(DataType schema, Row row, int index, DataOutput out) throws IOException {
+ if (row.isNullAt(index)) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (schema instanceof BooleanType) {
+ if (row.getBoolean(index)) {
+ booleanSerde.serialize(ABoolean.TRUE, out);
+ } else {
+ booleanSerde.serialize(ABoolean.FALSE, out);
+ }
+ } else if (schema instanceof ShortType) {
+ serializeLong(row.getShort(index), out);
+ } else if (schema instanceof IntegerType) {
+ serializeLong(row.getInt(index), out);
+ } else if (schema instanceof LongType) {
+ serializeLong(row.getLong(index), out);
+ } else if (schema instanceof DoubleType) {
+ serializeDouble(row.getDouble(index), out);
+ } else if (schema instanceof StringType) {
+ serializeString(row.getString(index), out);
+ } else if (schema instanceof DateType) {
+ serializeDate(row.getInt(index), out);
+ } else if (schema instanceof TimestampType) {
+ serializeTimestamp(row.getLong(index), out);
+ } else if (schema instanceof TimestampNTZType) {
+ serializeTimestamp(row.getLong(index), out);
+ } else if (schema instanceof StructType) {
+ parseObject(row.getStruct(index), out);
+ } else if (schema instanceof ArrayType) {
+ parseArray((ArrayType) schema, row.getArray(index), out);
+ } else if (schema instanceof DecimalType) {
+ serializeDecimal(row.getDecimal(index), out);
+ } else {
+ throw createUnsupportedException(schema);
+ }
+ }
+
+ private void parseValue(DataType schema, ColumnVector column, int index, DataOutput out) throws IOException {
+ if (column.isNullAt(index)) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (schema instanceof BooleanType) {
+ if (column.getBoolean(index)) {
+ booleanSerde.serialize(ABoolean.TRUE, out);
+ } else {
+ booleanSerde.serialize(ABoolean.FALSE, out);
+ }
+ } else if (schema instanceof ShortType) {
+ serializeLong(column.getShort(index), out);
+ } else if (schema instanceof IntegerType) {
+ serializeLong(column.getInt(index), out);
+ } else if (schema instanceof LongType) {
+ serializeLong(column.getLong(index), out);
+ } else if (schema instanceof DoubleType) {
+ serializeDouble(column.getDouble(index), out);
+ } else if (schema instanceof StringType) {
+ serializeString(column.getString(index), out);
+ } else if (schema instanceof DateType) {
+ serializeDate(column.getInt(index), out);
+ } else if (schema instanceof TimestampType) {
+ serializeTimestamp(column.getLong(index), out);
+ } else if (schema instanceof TimestampNTZType) {
+ serializeTimestamp(column.getLong(index), out);
+ } else if (schema instanceof ArrayType) {
+ parseArray((ArrayType) schema, column.getArray(index), out);
+ } else if (schema instanceof StructType) {
+ parseObject((StructType) schema, column, index, out);
+ } else if (schema instanceof DecimalType) {
+ serializeDecimal(column.getDecimal(index), out);
+ } else {
+ throw createUnsupportedException(schema);
+ }
+ }
+
+ private void serializeLong(Object value, DataOutput out) throws HyracksDataException {
+ long intValue = ((Number) value).longValue();
+ aInt64.setValue(intValue);
+ int64Serde.serialize(aInt64, out);
+ }
+
+ private void serializeDouble(Object value, DataOutput out) throws HyracksDataException {
+ double doubleValue = ((Number) value).doubleValue();
+ aDouble.setValue(doubleValue);
+ doubleSerde.serialize(aDouble, out);
+ }
+
+ private void serializeString(Object value, DataOutput out) throws HyracksDataException {
+ aString.setValue(value.toString());
+ stringSerde.serialize(aString, out);
+ }
+
+ private void serializeDate(Object value, DataOutput out) throws HyracksDataException {
+ aInt32.setValue((Integer) value);
+ int32Serde.serialize(aInt32, out);
+ }
+
+ private void serializeTimestamp(Object value, DataOutput out) throws HyracksDataException {
+ aInt64.setValue(TimeUnit.MICROSECONDS.toMillis((Long) value));
+ int64Serde.serialize(aInt64, out);
+ }
+
+ private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException {
+ serializeDouble(value.doubleValue(), out);
+ }
+
+ private static HyracksDataException createUnsupportedException(DataType schema) {
+ return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta Parser", schema.toString());
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
new file mode 100644
index 0000000..5d4b2dd
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+
+import io.delta.kernel.data.Row;
+
+public class DeltaTableDataParserFactory extends AbstractGenericDataParserFactory<Row> {
+
+ private static final long serialVersionUID = 1L;
+ private static final List<String> PARSER_FORMAT = List.of(ExternalDataConstants.FORMAT_DELTA);
+
+ @Override
+ public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
+ throw new UnsupportedOperationException("Stream parser is not supported");
+ }
+
+ @Override
+ public void setMetaType(ARecordType metaType) {
+ // no MetaType to set.
+ }
+
+ @Override
+ public List<String> getParserFormats() {
+ return PARSER_FORMAT;
+ }
+
+ @Override
+ public IRecordDataParser<Row> createRecordParser(IExternalDataRuntimeContext context) {
+ return createParser(context);
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return Row.class;
+ }
+
+ private DeltaDataParser createParser(IExternalDataRuntimeContext context) {
+ return new DeltaDataParser(context);
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index 97b04a4..5f0c018 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaFileRecordReader;
import org.apache.asterix.external.input.record.reader.stream.AvroRecordReader;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -111,8 +112,9 @@
if (format != null) {
if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_AVRO)) {
return AvroRecordReader.class;
- }
- if (recordReaders.containsKey(format)) {
+ } else if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_DELTA)) {
+ return DeltaFileRecordReader.class;
+ } else if (recordReaders.containsKey(format)) {
return findRecordReaderClazzWithConfig(configuration, format);
}
throw new AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index b4314b9..bf74079 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -108,10 +108,6 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
-import io.delta.standalone.DeltaLog;
-import io.delta.standalone.Snapshot;
-import io.delta.standalone.actions.AddFile;
-
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
@@ -474,8 +470,8 @@
if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
if (isDeltaTable(configuration)) {
- configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
- configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_PARQUET);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+ configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_DELTA);
}
prepareTableFormat(configuration);
}
@@ -499,23 +495,6 @@
}
}
- public static void prepareDeltaTableFormat(Map<String, String> configuration, Configuration conf,
- String tableMetadataPath) {
- DeltaLog deltaLog = DeltaLog.forTable(conf, tableMetadataPath);
- Snapshot snapshot = deltaLog.snapshot();
- List<AddFile> dataFiles = snapshot.getAllFiles();
- StringBuilder builder = new StringBuilder();
- for (AddFile batchFile : dataFiles) {
- builder.append(",");
- String path = batchFile.getPath();
- builder.append(tableMetadataPath).append('/').append(path);
- }
- if (builder.length() > 0) {
- builder.deleteCharAt(0);
- }
- configuration.put(ExternalDataConstants.KEY_PATH, builder.toString());
- }
-
public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
String tableMetadataPath) throws AlgebricksException {
HadoopTables tables = new HadoopTables(conf);
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index bbf5195..793516c 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -22,4 +22,5 @@
org.apache.asterix.external.parser.factory.RSSParserFactory
org.apache.asterix.external.parser.factory.TweetParserFactory
org.apache.asterix.external.parser.factory.NoOpDataParserFactory
-org.apache.asterix.external.parser.factory.AvroDataParserFactory
\ No newline at end of file
+org.apache.asterix.external.parser.factory.AvroDataParserFactory
+org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
\ No newline at end of file