Merge branch 'gerrit/goldfish' into 'master'
Change-Id: Ic8b85f85eb80fcfd046ebfca721de09c0ad5c753
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index f041fa2..1236636 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -196,7 +196,7 @@
Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
.schema(new StructType().add(new StructField("id", new IntegerType(), true))
.add(new StructField("name", new StringType(), true))
- .add(new StructField("age", new IntegerType(), true)))
+ .add(new StructField("age", new StringType(), true)))
.build();
txn.updateMetadata(metaData);
txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
index 2e77beb..63a83d1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
@@ -30,6 +30,6 @@
(
%template%,
("container"="playground"),
- ("definition"="my-table-empty"),
+ ("definition"="delta-data/empty_delta_table"),
("table-format" = "delta")
);
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 8c8ad10..27c3ac1 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -579,6 +579,22 @@
</dependency>
<dependency>
<groupId>io.delta</groupId>
+ <artifactId>delta-kernel-api</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-kernel-defaults</artifactId>
+ <version>3.2.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index f5a2cd5..9909cc3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,35 +18,70 @@
*/
package org.apache.asterix.external.input.record.reader.aws.delta;
-import static org.apache.asterix.external.util.ExternalDataUtils.prepareDeltaTableFormat;
import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
-public class AwsS3DeltaReaderFactory extends AwsS3ParquetReaderFactory {
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
private static final long serialVersionUID = 1L;
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+ private static final Logger LOGGER = LogManager.getLogger();
+ private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
+ private Map<Integer, List<String>> schedule;
+ private String scanState;
+ private Map<String, String> configuration;
+ private List<String> scanFiles;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return locationConstraints;
+ }
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
-
+ this.configuration = configuration;
Configuration conf = new Configuration();
conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+ conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+ }
conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
if (serviceEndpoint != null) {
@@ -56,8 +91,77 @@
+ configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
- prepareDeltaTableFormat(configuration, conf, tableMetadataPath);
- super.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory);
+ ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+
+ Engine engine = DefaultEngine.create(conf);
+ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+ Snapshot snapshot = table.getLatestSnapshot(engine);
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+ scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+ CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
+
+ scanFiles = new ArrayList<>();
+ while (iter.hasNext()) {
+ FilteredColumnarBatch batch = iter.next();
+ CloseableIterator<Row> rowIter = batch.getRows();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ scanFiles.add(RowSerDe.serializeRowToJson(row));
+ }
+ }
+ locationConstraints = configureLocationConstraints(appCtx);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+ distributeFiles();
+ }
+
+ private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+
+ String[] locations = csm.getClusterLocations().getLocations();
+ if (scanFiles.size() == 0) {
+ return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+ } else if (locations.length > scanFiles.size()) {
+ LOGGER.debug(
+ "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count",
+ locations.length, scanFiles.size());
+ final String[] locationCopy = locations.clone();
+ ArrayUtils.shuffle(locationCopy);
+ locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations);
+ }
+
+ private void distributeFiles() {
+ final int numComputePartitions = getPartitionConstraint().getLocations().length;
+ schedule = new HashMap<>();
+ for (int i = 0; i < numComputePartitions; i++) {
+ schedule.put(i, new ArrayList<>());
+ }
+ int i = 0;
+ for (String scanFile : scanFiles) {
+ schedule.get(i).add(scanFile);
+ i = (i + 1) % numComputePartitions;
+ }
+ }
+
+ @Override
+ public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+ try {
+ int partition = context.getPartition();
+ return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() throws AsterixException {
+ return Row.class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java
new file mode 100644
index 0000000..13c3d0c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java
@@ -0,0 +1,511 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BasePrimitiveType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.FieldMetadata;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+/**
+ * Serialize and deserialize Delta data types {@link DataType} to JSON and from JSON class based on
+ * the <a
+ * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types">serialization
+ * rules </a> outlined in the Delta Protocol.
+ */
+public class DataTypeJsonSerDe {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .registerModule(new SimpleModule().addSerializer(StructType.class, new StructTypeSerializer()));
+
+ private DataTypeJsonSerDe() {
+ }
+
+ /**
+ * Serializes a {@link DataType} to a JSON string according to the Delta Protocol. TODO: Only
+ * reason why this API added was due to Flink-Kernel dependency. Currently Flink-Kernel uses the
+ * Kernel DataType.toJson and Standalone DataType.fromJson to convert between types.
+ *
+ * @param dataType
+ * @return JSON string representing the data type
+ */
+ public static String serializeDataType(DataType dataType) {
+ try {
+ StringWriter stringWriter = new StringWriter();
+ JsonGenerator generator = OBJECT_MAPPER.createGenerator(stringWriter);
+ writeDataType(generator, dataType);
+ generator.flush();
+ return stringWriter.toString();
+ } catch (IOException ex) {
+ throw new KernelException("Could not serialize DataType to JSON", ex);
+ }
+ }
+
+ /**
+ * Deserializes a JSON string representing a Delta data type to a {@link DataType}.
+ *
+ * @param structTypeJson JSON string representing a {@link StructType} data type
+ */
+ public static StructType deserializeStructType(String structTypeJson) {
+ try {
+ DataType parsedType = parseDataType(OBJECT_MAPPER.reader().readTree(structTypeJson), "" /* fieldPath */,
+ new FieldMetadata.Builder().build() /* collationsMetadata */);
+ if (parsedType instanceof StructType) {
+ return (StructType) parsedType;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Could not parse the following JSON as a valid StructType:\n%s", structTypeJson));
+ }
+ } catch (JsonProcessingException ex) {
+ throw new KernelException(format("Could not parse schema given as JSON string: %s", structTypeJson), ex);
+ }
+ }
+
+ /**
+ * Parses a Delta data type from JSON. Data types can either be serialized as strings (for
+ * primitive types) or as objects (for complex types).
+ *
+ * <p>For example:
+ *
+ * <pre>
+ * // Map type field is serialized as:
+ * {
+ * "name" : "f",
+ * "type" : {
+ * "type" : "map",
+ * "keyType" : "string",
+ * "valueType" : "string",
+ * "valueContainsNull" : true
+ * },
+ * "nullable" : true,
+ * "metadata" : { }
+ * }
+ *
+ * // Integer type field serialized as:
+ * {
+ * "name" : "a",
+ * "type" : "integer",
+ * "nullable" : false,
+ * "metadata" : { }
+ * }
+ *
+ * // Collated string type field serialized as:
+ * {
+ * "name" : "s",
+ * "type" : "string",
+ * "nullable", false,
+ * "metadata" : {
+ * "__COLLATIONS": { "s": "ICU.de_DE" }
+ * }
+ * }
+ *
+ * // Array with collated strings field serialized as:
+ * {
+ * "name" : "arr",
+ * "type" : {
+ * "type" : "array",
+ * "elementType" : "string",
+ * "containsNull" : false
+ * }
+ * "nullable" : false,
+ * "metadata" : {
+ * "__COLLATIONS": { "arr.element": "ICU.de_DE" }
+ * }
+ * }
+ * </pre>
+ *
+ * @param fieldPath Path from the nearest ancestor that is of the {@link StructField} type. For
+ * example, "c1.key.element" represents a path starting from the {@link StructField} named
+ * "c1." The next element, "key," indicates that "c1" stores a {@link MapType} type. The final
+ * element, "element", shows that the key of the map is an {@link ArrayType} type.
+ * @param collationsMetadata Metadata that maps the path of a {@link StringType} to its collation.
+ * Only maps non-UTF8_BINARY collated {@link StringType}. Collation metadata is stored in the
+ * nearest ancestor, which is the StructField. This is because StructField includes a metadata
+ * field, whereas Map and Array do not, making them unable to store this information. Paths
+ * are in same form as `fieldPath`. <a
+ * href="https://github.com/delta-io/delta/blob/master/protocol_rfcs/collated-string-type.md#collation-identifiers">Docs</a>
+ */
+ static DataType parseDataType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+ switch (json.getNodeType()) {
+ case STRING:
+ // simple types are stored as just a string
+ return nameToType(json.textValue(), fieldPath, collationsMetadata);
+ case OBJECT:
+ // complex types (array, map, or struct are stored as JSON objects)
+ String type = getStringField(json, "type");
+ switch (type) {
+ case "struct":
+ assertValidTypeForCollations(fieldPath, "struct", collationsMetadata);
+ return parseStructType(json);
+ case "array":
+ assertValidTypeForCollations(fieldPath, "array", collationsMetadata);
+ return parseArrayType(json, fieldPath, collationsMetadata);
+ case "map":
+ assertValidTypeForCollations(fieldPath, "map", collationsMetadata);
+ return parseMapType(json, fieldPath, collationsMetadata);
+ // No default case here; fall through to the following error when no match
+ }
+ default:
+ throw new IllegalArgumentException(
+ String.format("Could not parse the following JSON as a valid Delta data type:\n%s", json));
+ }
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#array-type">array
+ * type </a>
+ */
+ private static ArrayType parseArrayType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+ checkArgument(json.isObject() && json.size() == 3,
+ "Expected JSON object with 3 fields for array data type but got:\n%s", json);
+ boolean containsNull = getBooleanField(json, "containsNull");
+ DataType dataType =
+ parseDataType(getNonNullField(json, "elementType"), fieldPath + ".element", collationsMetadata);
+ return new ArrayType(dataType, containsNull);
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#map-type">map type
+ * </a>
+ */
+ private static MapType parseMapType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+ checkArgument(json.isObject() && json.size() == 4,
+ "Expected JSON object with 4 fields for map data type but got:\n%s", json);
+ boolean valueContainsNull = getBooleanField(json, "valueContainsNull");
+ DataType keyType = parseDataType(getNonNullField(json, "keyType"), fieldPath + ".key", collationsMetadata);
+ DataType valueType =
+ parseDataType(getNonNullField(json, "valueType"), fieldPath + ".value", collationsMetadata);
+ return new MapType(keyType, valueType, valueContainsNull);
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-type">
+ * struct type </a>
+ */
+ private static StructType parseStructType(JsonNode json) {
+ checkArgument(json.isObject() && json.size() == 2,
+ "Expected JSON object with 2 fields for struct data type but got:\n%s", json);
+ JsonNode fieldsNode = getNonNullField(json, "fields");
+ checkArgument(fieldsNode.isArray(), "Expected array for fieldName=%s in:\n%s", "fields", json);
+ Iterator<JsonNode> fields = fieldsNode.elements();
+ List<StructField> parsedFields = new ArrayList<>();
+ while (fields.hasNext()) {
+ parsedFields.add(parseStructField(fields.next()));
+ }
+ return new StructType(parsedFields);
+ }
+
+ /**
+ * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-field">
+ * struct field </a>
+ */
+ private static StructField parseStructField(JsonNode json) {
+ checkArgument(json.isObject(), "Expected JSON object for struct field");
+ String name = getStringField(json, "name");
+ FieldMetadata metadata = parseFieldMetadata(json.get("metadata"), false);
+ DataType type = parseDataType(getNonNullField(json, "type"), name, null);
+ boolean nullable = getBooleanField(json, "nullable");
+ return new StructField(name, type, nullable, metadata);
+ }
+
+ /** Parses an {@link FieldMetadata}. */
+ private static FieldMetadata parseFieldMetadata(JsonNode json) {
+ return parseFieldMetadata(json, true);
+ }
+
+ /**
+ * Parses a {@link FieldMetadata}, optionally including collation metadata, depending on
+ * `includecollationsMetadata`.
+ */
+ private static FieldMetadata parseFieldMetadata(JsonNode json, boolean includecollationsMetadata) {
+ if (json == null || json.isNull()) {
+ return FieldMetadata.empty();
+ }
+
+ checkArgument(json.isObject(), "Expected JSON object for struct field metadata");
+ final Iterator<Map.Entry<String, JsonNode>> iterator = json.fields();
+ final FieldMetadata.Builder builder = FieldMetadata.builder();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ JsonNode value = entry.getValue();
+ String key = entry.getKey();
+
+ if (value.isNull()) {
+ builder.putNull(key);
+ } else if (value.isIntegralNumber()) { // covers both int and long
+ builder.putLong(key, value.longValue());
+ } else if (value.isDouble()) {
+ builder.putDouble(key, value.doubleValue());
+ } else if (value.isBoolean()) {
+ builder.putBoolean(key, value.booleanValue());
+ } else if (value.isTextual()) {
+ builder.putString(key, value.textValue());
+ } else if (value.isObject()) {
+ builder.putFieldMetadata(key, parseFieldMetadata(value));
+ } else if (value.isArray()) {
+ final Iterator<JsonNode> fields = value.elements();
+ if (!fields.hasNext()) {
+ // If it is an empty array, we cannot infer its element type.
+ // We put an empty Array[Long].
+ builder.putLongArray(key, new Long[0]);
+ } else {
+ final JsonNode head = fields.next();
+ if (head.isInt()) {
+ builder.putLongArray(key,
+ buildList(value, node -> (long) node.intValue()).toArray(new Long[0]));
+ } else if (head.isDouble()) {
+ builder.putDoubleArray(key, buildList(value, JsonNode::doubleValue).toArray(new Double[0]));
+ } else if (head.isBoolean()) {
+ builder.putBooleanArray(key, buildList(value, JsonNode::booleanValue).toArray(new Boolean[0]));
+ } else if (head.isTextual()) {
+ builder.putStringArray(key, buildList(value, JsonNode::textValue).toArray(new String[0]));
+ } else if (head.isObject()) {
+ builder.putFieldMetadataArray(key,
+ buildList(value, DataTypeJsonSerDe::parseFieldMetadata).toArray(new FieldMetadata[0]));
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported type for Array as field metadata value: %s", value));
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unsupported type for field metadata value: %s", value));
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * For an array JSON node builds a {@link List} using the provided {@code accessor} for each
+ * element.
+ */
+ private static <T> List<T> buildList(JsonNode json, Function<JsonNode, T> accessor) {
+ List<T> result = new ArrayList<>();
+ Iterator<JsonNode> elements = json.elements();
+ while (elements.hasNext()) {
+ result.add(accessor.apply(elements.next()));
+ }
+ return result;
+ }
+
+ private static String FIXED_DECIMAL_REGEX = "decimal\\(\\s*(\\d+)\\s*,\\s*(\\-?\\d+)\\s*\\)";
+ private static Pattern FIXED_DECIMAL_PATTERN = Pattern.compile(FIXED_DECIMAL_REGEX);
+
+ /** Parses primitive string type names to a {@link DataType} */
+ private static DataType nameToType(String name, String fieldPath, FieldMetadata collationsMetadata) {
+ if (BasePrimitiveType.isPrimitiveType(name)) {
+ return BasePrimitiveType.createPrimitive(name);
+ } else if (name.equals("decimal")) {
+ return DecimalType.USER_DEFAULT;
+ } else if ("void".equalsIgnoreCase(name)) {
+ // Earlier versions of Delta had VOID type which is not specified in Delta Protocol.
+ // It is not readable or writable. Throw a user-friendly error message.
+ throw new IllegalArgumentException(String.format("%s is not a supported delta data type", name));
+ } else {
+ // decimal has a special pattern with a precision and scale
+ Matcher decimalMatcher = FIXED_DECIMAL_PATTERN.matcher(name);
+ if (decimalMatcher.matches()) {
+ int precision = Integer.parseInt(decimalMatcher.group(1));
+ int scale = Integer.parseInt(decimalMatcher.group(2));
+ return new DecimalType(precision, scale);
+ }
+
+ // We have encountered a type that is beyond the specification of the protocol
+ // checks. This must be an invalid type (according to protocol) and
+ // not an unsupported data type by Kernel.
+ throw new IllegalArgumentException(String.format("%s is not a supported delta data type", name));
+ }
+ }
+
+ private static JsonNode getNonNullField(JsonNode rootNode, String fieldName) {
+ JsonNode node = rootNode.get(fieldName);
+ if (node == null || node.isNull()) {
+ throw new IllegalArgumentException(
+ String.format("Expected non-null for fieldName=%s in:\n%s", fieldName, rootNode));
+ }
+ return node;
+ }
+
+ private static String getStringField(JsonNode rootNode, String fieldName) {
+ JsonNode node = getNonNullField(rootNode, fieldName);
+ checkArgument(node.isTextual(), "Expected string for fieldName=%s in:\n%s", fieldName, rootNode);
+ return node.textValue(); // double check this only works for string values! and isTextual()!
+ }
+
+ private static void assertValidTypeForCollations(String fieldPath, String fieldType,
+ FieldMetadata collationsMetadata) {
+
+ }
+
+ /** Returns a metadata with a map of field path to collation name. */
+
+ private static boolean getBooleanField(JsonNode rootNode, String fieldName) {
+ JsonNode node = getNonNullField(rootNode, fieldName);
+ checkArgument(node.isBoolean(), "Expected boolean for fieldName=%s in:\n%s", fieldName, rootNode);
+ return node.booleanValue();
+ }
+
+ protected static class StructTypeSerializer extends StdSerializer<StructType> {
+ private static final long serialVersionUID = 1L;
+
+ public StructTypeSerializer() {
+ super(StructType.class);
+ }
+
+ @Override
+ public void serialize(StructType structType, JsonGenerator gen, SerializerProvider provider)
+ throws IOException {
+ writeDataType(gen, structType);
+ }
+ }
+
+ private static void writeDataType(JsonGenerator gen, DataType dataType) throws IOException {
+ if (dataType instanceof StructType) {
+ writeStructType(gen, (StructType) dataType);
+ } else if (dataType instanceof ArrayType) {
+ writeArrayType(gen, (ArrayType) dataType);
+ } else if (dataType instanceof MapType) {
+ writeMapType(gen, (MapType) dataType);
+ } else if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dataType;
+ gen.writeString(format("decimal(%d,%d)", decimalType.getPrecision(), decimalType.getScale()));
+ } else {
+ gen.writeString(dataType.toString());
+ }
+ }
+
+ private static void writeArrayType(JsonGenerator gen, ArrayType arrayType) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("type", "array");
+ gen.writeFieldName("elementType");
+ writeDataType(gen, arrayType.getElementType());
+ gen.writeBooleanField("containsNull", arrayType.containsNull());
+ gen.writeEndObject();
+ }
+
+ private static void writeMapType(JsonGenerator gen, MapType mapType) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("type", "map");
+ gen.writeFieldName("keyType");
+ writeDataType(gen, mapType.getKeyType());
+ gen.writeFieldName("valueType");
+ writeDataType(gen, mapType.getValueType());
+ gen.writeBooleanField("valueContainsNull", mapType.isValueContainsNull());
+ gen.writeEndObject();
+ }
+
+ private static void writeStructType(JsonGenerator gen, StructType structType) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("type", "struct");
+ gen.writeArrayFieldStart("fields");
+ for (StructField field : structType.fields()) {
+ writeStructField(gen, field);
+ }
+ gen.writeEndArray();
+ gen.writeEndObject();
+ }
+
+ private static void writeStructField(JsonGenerator gen, StructField field) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("name", field.getName());
+ gen.writeFieldName("type");
+ writeDataType(gen, field.getDataType());
+ gen.writeBooleanField("nullable", field.isNullable());
+ gen.writeFieldName("metadata");
+ writeFieldMetadata(gen, field.getMetadata());
+ gen.writeEndObject();
+ }
+
+ private static void writeFieldMetadata(JsonGenerator gen, FieldMetadata metadata) throws IOException {
+ gen.writeStartObject();
+ for (Map.Entry<String, Object> entry : metadata.getEntries().entrySet()) {
+ gen.writeFieldName(entry.getKey());
+ Object value = entry.getValue();
+ if (value instanceof Long) {
+ gen.writeNumber((Long) value);
+ } else if (value instanceof Double) {
+ gen.writeNumber((Double) value);
+ } else if (value instanceof Boolean) {
+ gen.writeBoolean((Boolean) value);
+ } else if (value instanceof String) {
+ gen.writeString((String) value);
+ } else if (value instanceof FieldMetadata) {
+ writeFieldMetadata(gen, (FieldMetadata) value);
+ } else if (value instanceof Long[]) {
+ gen.writeStartArray();
+ for (Long v : (Long[]) value) {
+ gen.writeNumber(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof Double[]) {
+ gen.writeStartArray();
+ for (Double v : (Double[]) value) {
+ gen.writeNumber(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof Boolean[]) {
+ gen.writeStartArray();
+ for (Boolean v : (Boolean[]) value) {
+ gen.writeBoolean(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof String[]) {
+ gen.writeStartArray();
+ for (String v : (String[]) value) {
+ gen.writeString(v);
+ }
+ gen.writeEndArray();
+ } else if (value instanceof FieldMetadata[]) {
+ gen.writeStartArray();
+ for (FieldMetadata v : (FieldMetadata[]) value) {
+ writeFieldMetadata(gen, v);
+ }
+ gen.writeEndArray();
+ } else if (value == null) {
+ gen.writeNull();
+ } else {
+ throw new IllegalArgumentException(format("Unsupported type for field metadata value: %s", value));
+ }
+ }
+ gen.writeEndObject();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
new file mode 100644
index 0000000..558f8a9
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+/**
+ * Delta record reader.
+ * The reader returns records in Delta Kernel Row format.
+ */
+public class DeltaFileRecordReader implements IRecordReader<Row> {
+
+ private Engine engine;
+ private List<Row> scanFiles;
+ private Row scanState;
+ protected IRawRecord<Row> record;
+ protected VoidPointable value = null;
+ private FileStatus fileStatus;
+ private StructType physicalReadSchema;
+ private CloseableIterator<ColumnarBatch> physicalDataIter;
+ private CloseableIterator<FilteredColumnarBatch> dataIter;
+ private int fileIndex;
+ private Row scanFile;
+ private CloseableIterator<Row> rows;
+
+ public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, Map<String, String> conf,
+ IExternalDataRuntimeContext context) {
+ Configuration config = new Configuration();
+ config.set(S3Constants.HADOOP_ACCESS_KEY_ID, conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+ config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+ config.set(S3Constants.HADOOP_SESSION_TOKEN, conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+ }
+ config.set(S3Constants.HADOOP_REGION, conf.get(S3Constants.REGION_FIELD_NAME));
+ String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
+ if (serviceEndpoint != null) {
+ config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ }
+ this.engine = DefaultEngine.create(config);
+ this.scanFiles = new ArrayList<>();
+ for (String scanFile : serScanFiles) {
+ this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
+ }
+ this.scanState = RowSerDe.deserializeRowFromJson(serScanState);
+ this.fileStatus = null;
+ this.physicalReadSchema = null;
+ this.physicalDataIter = null;
+ this.dataIter = null;
+ this.record = new GenericRecord<>();
+ if (scanFiles.size() > 0) {
+ this.fileIndex = 0;
+ this.scanFile = scanFiles.get(0);
+ this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+ this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+ try {
+ this.physicalDataIter = engine.getParquetHandler()
+ .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty());
+ this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ if (dataIter.hasNext()) {
+ rows = dataIter.next().getRows();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (dataIter != null) {
+ dataIter.close();
+ }
+ if (physicalDataIter != null) {
+ physicalDataIter.close();
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (rows != null && rows.hasNext()) {
+ return true;
+ } else if (dataIter != null && dataIter.hasNext()) {
+ rows = dataIter.next().getRows();
+ return this.hasNext();
+ } else if (fileIndex < scanFiles.size() - 1) {
+ fileIndex++;
+ scanFile = scanFiles.get(fileIndex);
+ fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+ physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+ physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+ physicalReadSchema, Optional.empty());
+ dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ return this.hasNext();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public IRawRecord<Row> next() throws IOException, InterruptedException {
+ Row row = rows.next();
+ record.set(row);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+
+ }
+
+ @Override
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java
new file mode 100644
index 0000000..d91cd9a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ */
+public class RowSerDe {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private RowSerDe() {
+ }
+
+ /**
+ * Utility method to serialize a {@link Row} as a JSON string
+ */
+ public static String serializeRowToJson(Row row) {
+ Map<String, Object> rowObject = convertRowToJsonObject(row);
+ try {
+ Map<String, Object> rowWithSchema = new HashMap<>();
+ rowWithSchema.put("schema", row.getSchema().toJson());
+ rowWithSchema.put("row", rowObject);
+ return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+ } catch (JsonProcessingException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Utility method to deserialize a {@link Row} object from the JSON form.
+ */
+ public static Row deserializeRowFromJson(String jsonRowWithSchema) {
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+ JsonNode schemaNode = jsonNode.get("schema");
+ StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaNode.asText());
+ return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
+ } catch (JsonProcessingException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ private static Map<String, Object> convertRowToJsonObject(Row row) {
+ StructType rowType = row.getSchema();
+ Map<String, Object> rowObject = new HashMap<>();
+ for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+ StructField field = rowType.at(fieldId);
+ DataType fieldType = field.getDataType();
+ String name = field.getName();
+
+ if (row.isNullAt(fieldId)) {
+ rowObject.put(name, null);
+ continue;
+ }
+
+ Object value;
+ if (fieldType instanceof BooleanType) {
+ value = row.getBoolean(fieldId);
+ } else if (fieldType instanceof ByteType) {
+ value = row.getByte(fieldId);
+ } else if (fieldType instanceof ShortType) {
+ value = row.getShort(fieldId);
+ } else if (fieldType instanceof IntegerType) {
+ value = row.getInt(fieldId);
+ } else if (fieldType instanceof LongType) {
+ value = row.getLong(fieldId);
+ } else if (fieldType instanceof FloatType) {
+ value = row.getFloat(fieldId);
+ } else if (fieldType instanceof DoubleType) {
+ value = row.getDouble(fieldId);
+ } else if (fieldType instanceof DateType) {
+ value = row.getInt(fieldId);
+ } else if (fieldType instanceof TimestampType) {
+ value = row.getLong(fieldId);
+ } else if (fieldType instanceof StringType) {
+ value = row.getString(fieldId);
+ } else if (fieldType instanceof ArrayType) {
+ value = VectorUtils.toJavaList(row.getArray(fieldId));
+ } else if (fieldType instanceof MapType) {
+ value = VectorUtils.toJavaMap(row.getMap(fieldId));
+ } else if (fieldType instanceof StructType) {
+ Row subRow = row.getStruct(fieldId);
+ value = convertRowToJsonObject(subRow);
+ } else {
+ throw new UnsupportedOperationException("NYI");
+ }
+
+ rowObject.put(name, value);
+ }
+
+ return rowObject;
+ }
+
+ private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, StructType rowType) {
+ return new DefaultJsonRow(rowJsonNode, rowType);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
new file mode 100644
index 0000000..81e465c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta.converter;
+
+import java.io.DataOutput;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeltaConverterContext extends ParserContext {
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADate> dateSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+ private final boolean decimalToDouble;
+ private final boolean timestampAsLong;
+ private final boolean dateAsInt;
+
+ private final int timeZoneOffset;
+ private final AMutableDate mutableDate = new AMutableDate(0);
+ private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+
+ public DeltaConverterContext(Map<String, String> configuration) {
+ decimalToDouble = Boolean.parseBoolean(configuration
+ .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
+ timestampAsLong = Boolean.parseBoolean(configuration
+ .getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG, ExternalDataConstants.TRUE));
+ dateAsInt = Boolean.parseBoolean(
+ configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT, ExternalDataConstants.TRUE));
+ String configuredTimeZoneId = configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE);
+ if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+ timeZoneOffset = TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+ } else {
+ timeZoneOffset = 0;
+ }
+ }
+
+ public void serializeDate(int value, DataOutput output) {
+ try {
+ mutableDate.setValue(value);
+ dateSerDer.serialize(mutableDate, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDateTime(long timestamp, DataOutput output) {
+ try {
+ mutableDateTime.setValue(timestamp);
+ datetimeSerDer.serialize(mutableDateTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public boolean isDecimalToDoubleEnabled() {
+ return decimalToDouble;
+ }
+
+ public int getTimeZoneOffset() {
+ return timeZoneOffset;
+ }
+
+ public boolean isTimestampAsLong() {
+ return timestampAsLong;
+ }
+
+ public boolean isDateAsInt() {
+ return dateAsInt;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
new file mode 100644
index 0000000..ea02d77
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+public class DeltaDataParser extends AbstractDataParser implements IRecordDataParser<Row> {
+ private final DeltaConverterContext parserContext;
+ private final IExternalFilterValueEmbedder valueEmbedder;
+
+ public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
+ parserContext = new DeltaConverterContext(conf);
+ valueEmbedder = context.getValueEmbedder();
+ }
+
+ @Override
+ public boolean parse(IRawRecord<? extends Row> record, DataOutput out) throws HyracksDataException {
+ try {
+ parseObject(record.get(), out);
+ valueEmbedder.reset();
+ return true;
+ } catch (AvroRuntimeException | IOException e) {
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
+ }
+
+ private void parseObject(Row record, DataOutput out) throws IOException {
+ IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ StructType schema = record.getSchema();
+ valueEmbedder.enterObject();
+ for (int i = 0; i < schema.fields().size(); i++) {
+ DataType fieldSchema = schema.fields().get(i).getDataType();
+ String fieldName = schema.fieldNames().get(i);
+ ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+ IValueReference value = null;
+ if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+ value = valueEmbedder.getEmbeddedValue();
+ } else {
+ valueBuffer.reset();
+ parseValue(fieldSchema, record, i, valueBuffer.getDataOutput());
+ value = valueBuffer;
+ }
+
+ if (value != null) {
+ // Ignore missing values
+ objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+ }
+ }
+
+ embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+ objectBuilder.write(out, true);
+ valueEmbedder.exitObject();
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ }
+
+ private void parseObject(StructType schema, ColumnVector column, int index, DataOutput out) throws IOException {
+ IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ valueEmbedder.enterObject();
+ for (int i = 0; i < schema.fields().size(); i++) {
+ DataType fieldSchema = schema.fields().get(i).getDataType();
+ String fieldName = schema.fieldNames().get(i);
+ ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+ IValueReference value = null;
+ if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+ value = valueEmbedder.getEmbeddedValue();
+ } else {
+ valueBuffer.reset();
+ parseValue(fieldSchema, column.getChild(i), index, valueBuffer.getDataOutput());
+ value = valueBuffer;
+ }
+
+ if (value != null) {
+ // Ignore missing values
+ objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+ }
+ }
+
+ embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+ objectBuilder.write(out, true);
+ valueEmbedder.exitObject();
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ }
+
+ private void parseArray(ArrayType arraySchema, ArrayValue arrayValue, DataOutput out) throws IOException {
+ DataType elementSchema = arraySchema.getElementType();
+ ColumnVector elements = arrayValue.getElements();
+ final IMutableValueStorage valueBuffer = parserContext.enterCollection();
+ final IAsterixListBuilder arrayBuilder =
+ parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+ for (int i = 0; i < elements.getSize(); i++) {
+ valueBuffer.reset();
+ parseValue(elementSchema, elements, i, valueBuffer.getDataOutput());
+ arrayBuilder.addItem(valueBuffer);
+ }
+ arrayBuilder.write(out, true);
+ parserContext.exitCollection(valueBuffer, arrayBuilder);
+ }
+
+ private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+ if (isNull) {
+ return ATypeTag.NULL;
+ }
+ if (schema instanceof BooleanType) {
+ return ATypeTag.BOOLEAN;
+ } else if (schema instanceof ShortType || schema instanceof IntegerType || schema instanceof LongType) {
+ return ATypeTag.BIGINT;
+ } else if (schema instanceof DoubleType) {
+ return ATypeTag.DOUBLE;
+ } else if (schema instanceof StringType) {
+ return ATypeTag.STRING;
+ } else if (schema instanceof DateType) {
+ if (parserContext.isDateAsInt()) {
+ return ATypeTag.INTEGER;
+ }
+ return ATypeTag.DATE;
+ } else if (schema instanceof TimestampType || schema instanceof TimestampNTZType) {
+ if (parserContext.isTimestampAsLong()) {
+ return ATypeTag.BIGINT;
+ }
+ return ATypeTag.DATETIME;
+ } else if (schema instanceof BinaryType) {
+ return ATypeTag.BINARY;
+ } else if (schema instanceof ArrayType) {
+ return ATypeTag.ARRAY;
+ } else if (schema instanceof StructType) {
+ return ATypeTag.OBJECT;
+ } else if (schema instanceof DecimalType) {
+ ensureDecimalToDoubleEnabled(schema, parserContext);
+ return ATypeTag.DOUBLE;
+ } else {
+ throw createUnsupportedException(schema);
+ }
+ }
+
+ private void parseValue(DataType schema, Row row, int index, DataOutput out) throws IOException {
+ if (row.isNullAt(index)) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (schema instanceof BooleanType) {
+ if (row.getBoolean(index)) {
+ booleanSerde.serialize(ABoolean.TRUE, out);
+ } else {
+ booleanSerde.serialize(ABoolean.FALSE, out);
+ }
+ } else if (schema instanceof ShortType) {
+ serializeLong(row.getShort(index), out);
+ } else if (schema instanceof IntegerType) {
+ serializeLong(row.getInt(index), out);
+ } else if (schema instanceof LongType) {
+ serializeLong(row.getLong(index), out);
+ } else if (schema instanceof DoubleType) {
+ serializeDouble(row.getDouble(index), out);
+ } else if (schema instanceof StringType) {
+ serializeString(row.getString(index), out);
+ } else if (schema instanceof DateType) {
+ if (parserContext.isDateAsInt()) {
+ serializeLong(row.getInt(index), out);
+ } else {
+ parserContext.serializeDate(row.getInt(index), out);
+ }
+ } else if (schema instanceof TimestampType) {
+ long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+ int offset = parserContext.getTimeZoneOffset();
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis + offset, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis + offset, out);
+ }
+ } else if (schema instanceof TimestampNTZType) {
+ long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis, out);
+ }
+ } else if (schema instanceof StructType) {
+ parseObject(row.getStruct(index), out);
+ } else if (schema instanceof ArrayType) {
+ parseArray((ArrayType) schema, row.getArray(index), out);
+ } else if (schema instanceof DecimalType) {
+ serializeDecimal(row.getDecimal(index), out);
+ } else {
+ throw createUnsupportedException(schema);
+ }
+ }
+
+ private void parseValue(DataType schema, ColumnVector column, int index, DataOutput out) throws IOException {
+ if (column.isNullAt(index)) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (schema instanceof BooleanType) {
+ if (column.getBoolean(index)) {
+ booleanSerde.serialize(ABoolean.TRUE, out);
+ } else {
+ booleanSerde.serialize(ABoolean.FALSE, out);
+ }
+ } else if (schema instanceof ShortType) {
+ serializeLong(column.getShort(index), out);
+ } else if (schema instanceof IntegerType) {
+ serializeLong(column.getInt(index), out);
+ } else if (schema instanceof LongType) {
+ serializeLong(column.getLong(index), out);
+ } else if (schema instanceof DoubleType) {
+ serializeDouble(column.getDouble(index), out);
+ } else if (schema instanceof StringType) {
+ serializeString(column.getString(index), out);
+ } else if (schema instanceof DateType) {
+ if (parserContext.isDateAsInt()) {
+ serializeLong(column.getInt(index), out);
+ } else {
+ parserContext.serializeDate(column.getInt(index), out);
+ }
+ } else if (schema instanceof TimestampType) {
+ long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+ int offset = parserContext.getTimeZoneOffset();
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis + offset, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis + offset, out);
+ }
+ } else if (schema instanceof TimestampNTZType) {
+ long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis, out);
+ }
+ } else if (schema instanceof ArrayType) {
+ parseArray((ArrayType) schema, column.getArray(index), out);
+ } else if (schema instanceof StructType) {
+ parseObject((StructType) schema, column, index, out);
+ } else if (schema instanceof DecimalType) {
+ serializeDecimal(column.getDecimal(index), out);
+ } else {
+ throw createUnsupportedException(schema);
+ }
+ }
+
+ private void serializeLong(Object value, DataOutput out) throws HyracksDataException {
+ long intValue = ((Number) value).longValue();
+ aInt64.setValue(intValue);
+ int64Serde.serialize(aInt64, out);
+ }
+
+ private void serializeDouble(Object value, DataOutput out) throws HyracksDataException {
+ double doubleValue = ((Number) value).doubleValue();
+ aDouble.setValue(doubleValue);
+ doubleSerde.serialize(aDouble, out);
+ }
+
+ private void serializeString(Object value, DataOutput out) throws HyracksDataException {
+ aString.setValue(value.toString());
+ stringSerde.serialize(aString, out);
+ }
+
+ private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException {
+ serializeDouble(value.doubleValue(), out);
+ }
+
+ private static HyracksDataException createUnsupportedException(DataType schema) {
+ return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta Parser", schema.toString());
+ }
+
+ private static void ensureDecimalToDoubleEnabled(DataType type, DeltaConverterContext context)
+ throws RuntimeDataException {
+ if (!context.isDecimalToDoubleEnabled()) {
+ throw new RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, type.toString(),
+ ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
new file mode 100644
index 0000000..7f28105
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+
+import io.delta.kernel.data.Row;
+
+public class DeltaTableDataParserFactory extends AbstractGenericDataParserFactory<Row> {
+
+ private static final long serialVersionUID = 1L;
+ private static final List<String> PARSER_FORMAT = List.of(ExternalDataConstants.FORMAT_DELTA);
+
+ @Override
+ public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
+ throw new UnsupportedOperationException("Stream parser is not supported");
+ }
+
+ @Override
+ public void setMetaType(ARecordType metaType) {
+ // no MetaType to set.
+ }
+
+ @Override
+ public List<String> getParserFormats() {
+ return PARSER_FORMAT;
+ }
+
+ @Override
+ public IRecordDataParser<Row> createRecordParser(IExternalDataRuntimeContext context) {
+ return createParser(context);
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return Row.class;
+ }
+
+ private DeltaDataParser createParser(IExternalDataRuntimeContext context) {
+ return new DeltaDataParser(context, configuration);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index 97b04a4..5f0c018 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaFileRecordReader;
import org.apache.asterix.external.input.record.reader.stream.AvroRecordReader;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -111,8 +112,9 @@
if (format != null) {
if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_AVRO)) {
return AvroRecordReader.class;
- }
- if (recordReaders.containsKey(format)) {
+ } else if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_DELTA)) {
+ return DeltaFileRecordReader.class;
+ } else if (recordReaders.containsKey(format)) {
return findRecordReaderClazzWithConfig(configuration, format);
}
throw new AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 2bc65f2..202e131 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -20,7 +20,6 @@
import java.util.List;
import java.util.Set;
-import java.util.TimeZone;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -390,6 +389,16 @@
WRITER_SUPPORTED_QUOTES = List.of(DEFAULT_QUOTE, DEFAULT_SINGLE_QUOTE, NONE);
}
+ public static class DeltaOptions {
+ private DeltaOptions() {
+ }
+
+ public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+ public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+ public static final String DATE_AS_INT = "date-to-int";
+ public static final String TIMEZONE = "timezone";
+ }
+
public static class ParquetOptions {
private ParquetOptions() {
}
@@ -419,10 +428,5 @@
*/
public static final String TIMEZONE = "timezone";
public static final String HADOOP_TIMEZONE = ASTERIX_HADOOP_PREFIX + TIMEZONE;
-
- /**
- * Valid time zones that are supported by Java
- */
- public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs());
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 6ba618d..950f3e2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -49,6 +49,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -109,11 +111,9 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
-import io.delta.standalone.DeltaLog;
-import io.delta.standalone.Snapshot;
-import io.delta.standalone.actions.AddFile;
-
public class ExternalDataUtils {
+
+ private static final Set<String> validTimeZones = Set.of(TimeZone.getAvailableIDs());
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
private static final int HEADER_FUDGE = 64;
@@ -476,8 +476,8 @@
if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
if (isDeltaTable(configuration)) {
- configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
- configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_PARQUET);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+ configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_DELTA);
}
prepareTableFormat(configuration);
}
@@ -499,23 +499,11 @@
throw new CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT,
configuration.get(ExternalDataConstants.KEY_FORMAT));
}
- }
-
- public static void prepareDeltaTableFormat(Map<String, String> configuration, Configuration conf,
- String tableMetadataPath) {
- DeltaLog deltaLog = DeltaLog.forTable(conf, tableMetadataPath);
- Snapshot snapshot = deltaLog.snapshot();
- List<AddFile> dataFiles = snapshot.getAllFiles();
- StringBuilder builder = new StringBuilder();
- for (AddFile batchFile : dataFiles) {
- builder.append(",");
- String path = batchFile.getPath();
- builder.append(tableMetadataPath).append('/').append(path);
+ if (configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)
+ && !validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE))) {
+ throw new CompilationException(ErrorCode.INVALID_TIMEZONE,
+ configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE));
}
- if (builder.length() > 0) {
- builder.deleteCharAt(0);
- }
- configuration.put(ExternalDataConstants.KEY_PATH, builder.toString());
}
public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
@@ -964,7 +952,7 @@
if (datasetRecordType.getFieldTypes().length != 0) {
throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName());
} else if (properties.containsKey(ParquetOptions.TIMEZONE)
- && !ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE))) {
+ && !validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) {
//Ensure the configured time zone id is correct
throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE));
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 50c75ff..5274c44 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -103,14 +103,17 @@
try {
builder.setCredentials(GoogleCredentials.getApplicationDefault());
- } catch (IOException ex) {
+ } catch (Exception ex) {
throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
}
} else if (jsonCredentials != null) {
try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
builder.setCredentials(GoogleCredentials.fromStream(credentialsStream));
} catch (IOException ex) {
- throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ } catch (Exception ex) {
+ throw new CompilationException(EXTERNAL_SOURCE_ERROR,
+ "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid.");
}
} else {
builder.setCredentials(NoCredentials.getInstance());
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index bbf5195..793516c 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -22,4 +22,5 @@
org.apache.asterix.external.parser.factory.RSSParserFactory
org.apache.asterix.external.parser.factory.TweetParserFactory
org.apache.asterix.external.parser.factory.NoOpDataParserFactory
-org.apache.asterix.external.parser.factory.AvroDataParserFactory
\ No newline at end of file
+org.apache.asterix.external.parser.factory.AvroDataParserFactory
+org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
\ No newline at end of file