[ASTERIXDB-3602][EXT] Reading Delta tables with ISO8601 formatted timestamp partition
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-66229
Change-Id: I2cf9e532882dad8c778b43d72f3f1bcab37d5a00
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19693
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index 4080f4c..d31fb66 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -449,6 +449,9 @@
loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table", PARQUET_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/timestamp_partitioned_delta_table", PARQUET_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/timestamp_partitioned_delta_table/_delta_log", JSON_FILTER,
+ "delta-data/");
}
private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 7c8840a..a3b20f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -48,6 +48,7 @@
import io.delta.standalone.types.StringType;
import io.delta.standalone.types.StructField;
import io.delta.standalone.types.StructType;
+import io.delta.standalone.types.TimestampType;
public class DeltaTableGenerator {
public static final String DELTA_GEN_BASEDIR = "target" + File.separatorChar + "generated_delta_files";
@@ -63,6 +64,8 @@
"target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine";
public static final String DELTA_PARTITIONED_TABLE =
"target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "partitioned_delta_table";
+ public static final String DELTA_TIMESTAMP_PARTITIONED_TABLE = "target" + File.separatorChar
+ + "generated_delta_files" + File.separatorChar + "timestamp_partitioned_delta_table";
public static void prepareDeltaTableContainer(Configuration conf) {
File basePath = new File(".");
@@ -73,6 +76,7 @@
prepareFileSizeOne(conf);
prepareFileSizeNine(conf);
preparePartitionedTable(conf);
+ prepareTimestampPartitionedTable(conf);
}
public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) {
@@ -498,4 +502,129 @@
}
}
+ public static void prepareTimestampPartitionedTable(Configuration conf) {
+ Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name")
+ .requiredString("timestamp").endRecord();
+ try {
+ List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema),
+ new GenericData.Record(schema), new GenericData.Record(schema));
+ List<GenericData.Record> fileSecondSnapshotRecords =
+ List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+ List<GenericData.Record> fileThirdSnapshotRecords =
+ List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+ List<GenericData.Record> fileFourthSnapshotRecords =
+ List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+
+ fileFirstSnapshotRecords.get(0).put("id", 0);
+ fileFirstSnapshotRecords.get(0).put("name", "Order 1");
+ fileFirstSnapshotRecords.get(0).put("timestamp", "2025-01-01 00:01:20");
+
+ fileFirstSnapshotRecords.get(1).put("id", 1);
+ fileFirstSnapshotRecords.get(1).put("name", "Order 2");
+ fileFirstSnapshotRecords.get(1).put("timestamp", "2025-01-01 00:01:20.00");
+
+ fileFirstSnapshotRecords.get(2).put("id", 2);
+ fileFirstSnapshotRecords.get(2).put("name", "Order 3");
+ fileFirstSnapshotRecords.get(2).put("timestamp", "2025-01-01 00:01:20.000000");
+
+ fileSecondSnapshotRecords.get(0).put("id", 3);
+ fileSecondSnapshotRecords.get(0).put("name", "Order 10");
+ fileSecondSnapshotRecords.get(0).put("timestamp", "2025-01-01T00:01:30Z");
+
+ fileSecondSnapshotRecords.get(1).put("id", 4);
+ fileSecondSnapshotRecords.get(1).put("name", "Order 11");
+ fileSecondSnapshotRecords.get(1).put("timestamp", "2025-01-01T00T01:30.000000Z");
+
+ fileThirdSnapshotRecords.get(0).put("id", 5);
+ fileThirdSnapshotRecords.get(0).put("name", "Order 21");
+ fileThirdSnapshotRecords.get(0).put("timestamp", "2025-01-02 00:02:20.100");
+
+ fileThirdSnapshotRecords.get(1).put("id", 6);
+ fileThirdSnapshotRecords.get(1).put("name", "Order 22");
+ fileThirdSnapshotRecords.get(1).put("timestamp", "2025-01-02 00:02:20.100");
+
+ fileFourthSnapshotRecords.get(0).put("id", 7);
+ fileFourthSnapshotRecords.get(0).put("name", "Order 30");
+ fileFourthSnapshotRecords.get(0).put("timestamp", "2025-01-02T00:02:30.100000Z");
+
+ fileFourthSnapshotRecords.get(1).put("id", 8);
+ fileFourthSnapshotRecords.get(1).put("name", "Order 31");
+ fileFourthSnapshotRecords.get(1).put("timestamp", "2025-01-02T00:02:30.100000Z");
+
+ Path path = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "firstFile.parquet");
+ ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileFirstSnapshotRecords) {
+ writer.write(record);
+ }
+ long size = writer.getDataSize();
+ writer.close();
+
+ Path path2 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "secondFile.parquet");
+ ParquetWriter<GenericData.Record> writer2 =
+ AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileSecondSnapshotRecords) {
+ writer2.write(record);
+ }
+ long size2 = writer2.getDataSize();
+ writer2.close();
+
+ Path path3 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "thirdFile.parquet");
+ ParquetWriter<GenericData.Record> writer3 =
+ AvroParquetWriter.<GenericData.Record> builder(path3).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileThirdSnapshotRecords) {
+ writer3.write(record);
+ }
+ long size3 = writer3.getDataSize();
+ writer3.close();
+
+ Path path4 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "fourthFile.parquet");
+ ParquetWriter<GenericData.Record> writer4 =
+ AvroParquetWriter.<GenericData.Record> builder(path4).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileFourthSnapshotRecords) {
+ writer4.write(record);
+ }
+ long size4 = writer4.getDataSize();
+ writer4.close();
+
+ DeltaLog log = DeltaLog.forTable(conf, DELTA_TIMESTAMP_PARTITIONED_TABLE);
+ OptimisticTransaction txn = log.startTransaction();
+ Metadata metaData = txn.metadata().copyBuilder().partitionColumns(Arrays.asList("timestamp"))
+ .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+ .add(new StructField("name", new StringType(), true))
+ .add(new StructField("timestamp", new TimestampType(), true)))
+ .build();
+
+ Map<String, String> partitionValues = new HashMap<>();
+ partitionValues.put("timestamp", "2025-01-01 00:01:20");
+ List<Action> actions = List.of(new AddFile("firstFile.parquet", partitionValues, size,
+ System.currentTimeMillis(), true, null, null));
+ txn.updateMetadata(metaData);
+ txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+ txn = log.startTransaction();
+ partitionValues.clear();
+ partitionValues.put("timestamp", "2025-01-01T00:01:30Z");
+ actions = List.of(new AddFile("secondFile.parquet", partitionValues, size2, System.currentTimeMillis(),
+ true, null, null));
+ txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+ txn = log.startTransaction();
+ partitionValues.clear();
+ partitionValues.put("timestamp", "2025-01-02 00:02:20.100");
+ actions = List.of(new AddFile("thirdFile.parquet", partitionValues, size3, System.currentTimeMillis(), true,
+ null, null));
+ txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+ txn = log.startTransaction();
+ partitionValues.clear();
+ partitionValues.put("timestamp", "2025-01-02T00:02:30.100000Z");
+ actions = List.of(new AddFile("fourthFile.parquet", partitionValues, size4, System.currentTimeMillis(),
+ true, null, null));
+ txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
new file mode 100644
index 0000000..da4b0e8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/timestamp_partitioned_delta_table"),
+ ("table-format" = "delta")
+ );
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/timestamp_partitioned_delta_table"),
+ ("table-format" = "delta"),
+ ("timestamp-to-long" = "false")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
new file mode 100644
index 0000000..bfd6581
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp
new file mode 100644
index 0000000..a84d381
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset2 as ds where ds.timestamp=datetime("2025-01-01T00:01:20Z") order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm
new file mode 100644
index 0000000..dabfb4f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Order 1", "timestamp": 1735689680000 }
+{ "id": 1, "name": "Order 2", "timestamp": 1735689680000 }
+{ "id": 2, "name": "Order 3", "timestamp": 1735689680000 }
+{ "id": 3, "name": "Order 10", "timestamp": 1735689690000 }
+{ "id": 4, "name": "Order 11", "timestamp": 1735689690000 }
+{ "id": 5, "name": "Order 21", "timestamp": 1735776140100 }
+{ "id": 6, "name": "Order 22", "timestamp": 1735776140100 }
+{ "id": 7, "name": "Order 30", "timestamp": 1735776150100 }
+{ "id": 8, "name": "Order 31", "timestamp": 1735776150100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm
new file mode 100644
index 0000000..c185038
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm
@@ -0,0 +1,3 @@
+{ "id": 0, "name": "Order 1", "timestamp": datetime("2025-01-01T00:01:20.000") }
+{ "id": 1, "name": "Order 2", "timestamp": datetime("2025-01-01T00:01:20.000") }
+{ "id": 2, "name": "Order 3", "timestamp": datetime("2025-01-01T00:01:20.000")
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 38ebf01..c9f9da6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -639,6 +639,12 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-timestamp-partitioned-file-read">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-timestamp-partitioned-file-read</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-map">
<placeholder name="adapter" value="S3" />
<placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
new file mode 100644
index 0000000..560360c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility methods used by the default expression evaluator.
+ */
+class DefaultExpressionUtils {
+ private DefaultExpressionUtils() {
+ }
+
+ static final Comparator<String> STRING_COMPARATOR = (leftOp, rightOp) -> {
+ byte[] leftBytes = leftOp.getBytes(StandardCharsets.UTF_8);
+ byte[] rightBytes = rightOp.getBytes(StandardCharsets.UTF_8);
+ int i = 0;
+ while (i < leftBytes.length && i < rightBytes.length) {
+ if (leftBytes[i] != rightBytes[i]) {
+ return Byte.toUnsignedInt(leftBytes[i]) - Byte.toUnsignedInt(rightBytes[i]);
+ }
+ i++;
+ }
+ return Integer.compare(leftBytes.length, rightBytes.length);
+ };
+
+ static final Comparator<byte[]> BINARY_COMPARTOR = (leftOp, rightOp) -> {
+ int i = 0;
+ while (i < leftOp.length && i < rightOp.length) {
+ if (leftOp[i] != rightOp[i]) {
+ return Byte.toUnsignedInt(leftOp[i]) - Byte.toUnsignedInt(rightOp[i]);
+ }
+ i++;
+ }
+ return Integer.compare(leftOp.length, rightOp.length);
+ };
+
+ /**
+ * Utility method that calculates the nullability result from given two vectors. Result is
+ * null if at least one side is a null.
+ */
+ static boolean[] evalNullability(ColumnVector left, ColumnVector right) {
+ int numRows = left.getSize();
+ boolean[] nullability = new boolean[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ nullability[rowId] = left.isNullAt(rowId) || right.isNullAt(rowId);
+ }
+ return nullability;
+ }
+
+ /**
+ * Wraps a child vector as a boolean {@link ColumnVector} with the given value and nullability
+ * accessors.
+ */
+ static ColumnVector booleanWrapperVector(ColumnVector childVector, Function<Integer, Boolean> valueAccessor,
+ Function<Integer, Boolean> nullabilityAccessor) {
+
+ return new ColumnVector() {
+
+ @Override
+ public DataType getDataType() {
+ return BooleanType.BOOLEAN;
+ }
+
+ @Override
+ public int getSize() {
+ return childVector.getSize();
+ }
+
+ @Override
+ public void close() {
+ childVector.close();
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return nullabilityAccessor.apply(rowId);
+ }
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ return valueAccessor.apply(rowId);
+ }
+ };
+ }
+
+ /**
+ * Utility method to compare the left and right according to the natural ordering
+ * and return an integer array where each row contains the comparison result (-1, 0, 1) for
+ * corresponding rows in the input vectors compared.
+ * <p>
+ * Only primitive data types are supported.
+ */
+ static int[] compare(ColumnVector left, ColumnVector right) {
+ checkArgument(left.getSize() == right.getSize(), "Left and right operand have different vector sizes.");
+ DataType dataType = left.getDataType();
+
+ int numRows = left.getSize();
+ int[] result = new int[numRows];
+ if (dataType instanceof BooleanType) {
+ compareBoolean(left, right, result);
+ } else if (dataType instanceof ByteType) {
+ compareByte(left, right, result);
+ } else if (dataType instanceof ShortType) {
+ compareShort(left, right, result);
+ } else if (dataType instanceof IntegerType || dataType instanceof DateType) {
+ compareInt(left, right, result);
+ } else if (dataType instanceof LongType || dataType instanceof TimestampType
+ || dataType instanceof TimestampNTZType) {
+ compareLong(left, right, result);
+ } else if (dataType instanceof FloatType) {
+ compareFloat(left, right, result);
+ } else if (dataType instanceof DoubleType) {
+ compareDouble(left, right, result);
+ } else if (dataType instanceof DecimalType) {
+ compareDecimal(left, right, result);
+ } else if (dataType instanceof StringType) {
+ compareString(left, right, result);
+ } else if (dataType instanceof BinaryType) {
+ compareBinary(left, right, result);
+ } else {
+ throw new UnsupportedOperationException(dataType + " can not be compared.");
+ }
+ return result;
+ }
+
+ static void compareBoolean(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Boolean.compare(left.getBoolean(rowId), right.getBoolean(rowId));
+ }
+ }
+ }
+
+ static void compareByte(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Byte.compare(left.getByte(rowId), right.getByte(rowId));
+ }
+ }
+ }
+
+ static void compareShort(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Short.compare(left.getShort(rowId), right.getShort(rowId));
+ }
+ }
+ }
+
+ static void compareInt(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Integer.compare(left.getInt(rowId), right.getInt(rowId));
+ }
+ }
+ }
+
+ static void compareLong(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Long.compare(left.getLong(rowId), right.getLong(rowId));
+ }
+ }
+ }
+
+ static void compareFloat(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Float.compare(left.getFloat(rowId), right.getFloat(rowId));
+ }
+ }
+ }
+
+ static void compareDouble(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = Double.compare(left.getDouble(rowId), right.getDouble(rowId));
+ }
+ }
+ }
+
+ static void compareString(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = STRING_COMPARATOR.compare(left.getString(rowId), right.getString(rowId));
+ }
+ }
+ }
+
+ static void compareDecimal(ColumnVector left, ColumnVector right, int[] result) {
+ Comparator<BigDecimal> comparator = Comparator.naturalOrder();
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = comparator.compare(left.getDecimal(rowId), right.getDecimal(rowId));
+ }
+ }
+ }
+
+ static void compareBinary(ColumnVector left, ColumnVector right, int[] result) {
+ for (int rowId = 0; rowId < left.getSize(); rowId++) {
+ if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+ result[rowId] = BINARY_COMPARTOR.compare(left.getBinary(rowId), right.getBinary(rowId));
+ }
+ }
+ }
+
+ static Expression childAt(Expression expression, int index) {
+ return expression.getChildren().get(index);
+ }
+
+ /**
+ * Combines a list of column vectors into one column vector based on the resolution of
+ * idxToReturn
+ * @param vectors List of ColumnVectors of the same data type with length >= 1
+ * @param idxToReturn Function that takes in a rowId and returns the index of the column vector
+ * to use as the return value
+ */
+ static ColumnVector combinationVector(List<ColumnVector> vectors, Function<Integer, Integer> idxToReturn) {
+ return new ColumnVector() {
+ // Store the last lookup value to avoid multiple looks up for same rowId.
+ // The general pattern is call `isNullAt(rowId)` followed by `getBoolean(rowId)` or
+ // some other value accessor. So the cache of one value is enough.
+ private int lastLookupRowId = -1;
+ private ColumnVector lastLookupVector = null;
+
+ @Override
+ public DataType getDataType() {
+ return vectors.get(0).getDataType();
+ }
+
+ @Override
+ public int getSize() {
+ return vectors.get(0).getSize();
+ }
+
+ @Override
+ public void close() {
+ Utils.closeCloseables(vectors.toArray(new ColumnVector[0]));
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return getVector(rowId).isNullAt(rowId);
+ }
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ return getVector(rowId).getBoolean(rowId);
+ }
+
+ @Override
+ public byte getByte(int rowId) {
+ return getVector(rowId).getByte(rowId);
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ return getVector(rowId).getShort(rowId);
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return getVector(rowId).getInt(rowId);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ return getVector(rowId).getLong(rowId);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return getVector(rowId).getFloat(rowId);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return getVector(rowId).getDouble(rowId);
+ }
+
+ @Override
+ public byte[] getBinary(int rowId) {
+ return getVector(rowId).getBinary(rowId);
+ }
+
+ @Override
+ public String getString(int rowId) {
+ return getVector(rowId).getString(rowId);
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ return getVector(rowId).getDecimal(rowId);
+ }
+
+ @Override
+ public MapValue getMap(int rowId) {
+ return getVector(rowId).getMap(rowId);
+ }
+
+ @Override
+ public ArrayValue getArray(int rowId) {
+ return getVector(rowId).getArray(rowId);
+ }
+
+ @Override
+ public ColumnVector getChild(int ordinal) {
+ return combinationVector(vectors.stream().map(v -> v.getChild(ordinal)).collect(Collectors.toList()),
+ idxToReturn);
+ }
+
+ private ColumnVector getVector(int rowId) {
+ if (rowId == lastLookupRowId) {
+ return lastLookupVector;
+ }
+ lastLookupRowId = rowId;
+ lastLookupVector = vectors.get(idxToReturn.apply(rowId));
+ return lastLookupVector;
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
new file mode 100644
index 0000000..1b4e12e
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.ExpressionHandler;
+
+public class DeltaEngine extends DefaultEngine {
+
+ protected DeltaEngine(Configuration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public ExpressionHandler getExpressionHandler() {
+ return new DeltaExpressionHandler();
+ }
+
+ public static DeltaEngine create(Configuration configuration) {
+ return new DeltaEngine(configuration);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
new file mode 100644
index 0000000..b3082b5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.ExpressionUtils.getLeft;
+import static io.delta.kernel.internal.util.ExpressionUtils.getRight;
+import static io.delta.kernel.internal.util.ExpressionUtils.getUnaryChild;
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.booleanWrapperVector;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.childAt;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.compare;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.evalNullability;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.defaults.internal.data.vector.DefaultBooleanVector;
+import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
+import io.delta.kernel.defaults.internal.expressions.DefaultExpressionEvaluator;
+import io.delta.kernel.expressions.AlwaysFalse;
+import io.delta.kernel.expressions.AlwaysTrue;
+import io.delta.kernel.expressions.And;
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Or;
+import io.delta.kernel.expressions.PartitionValueExpression;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.ScalarExpression;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+public class DeltaExpressionEvaluator extends DefaultExpressionEvaluator {
+
+ private final Expression expression;
+
+ public DeltaExpressionEvaluator(StructType structType, Expression expression, DataType dataType) {
+ super(structType, expression, dataType);
+ this.expression = expression;
+ }
+
+ @Override
+ public ColumnVector eval(ColumnarBatch input) {
+ return new ExpressionEvalVisitor(input).visit(expression);
+ }
+
+ /**
+ * Implementation of {@link ExpressionVisitor} to evaluate expression on a
+ * {@link ColumnarBatch}.
+ */
+ private static class ExpressionEvalVisitor extends ExpressionVisitor<ColumnVector> {
+ private final ColumnarBatch input;
+
+ ExpressionEvalVisitor(ColumnarBatch input) {
+ this.input = input;
+ }
+
+ /*
+ | Operand 1 | Operand 2 | `AND` | `OR` |
+ |-----------|-----------|------------|------------|
+ | True | True | True | True |
+ | True | False | False | True |
+ | True | NULL | NULL | True |
+ | False | True | False | True |
+ | False | False | False | False |
+ | False | NULL | False | NULL |
+ | NULL | True | NULL | True |
+ | NULL | False | False | NULL |
+ | NULL | NULL | NULL | NULL |
+ */
+ @Override
+ ColumnVector visitAnd(And and) {
+ PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(and);
+ ColumnVector left = argResults.leftResult;
+ ColumnVector right = argResults.rightResult;
+ int numRows = argResults.rowCount;
+ boolean[] result = new boolean[numRows];
+ boolean[] nullability = new boolean[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId);
+ boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId);
+ boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId);
+ boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId);
+
+ if (leftIsFalse || rightIsFalse) {
+ nullability[rowId] = false;
+ result[rowId] = false;
+ } else if (leftIsTrue && rightIsTrue) {
+ nullability[rowId] = false;
+ result[rowId] = true;
+ } else {
+ nullability[rowId] = true;
+ // result[rowId] is undefined when nullability[rowId] = true
+ }
+ }
+ return new DefaultBooleanVector(numRows, Optional.of(nullability), result);
+ }
+
+ @Override
+ ColumnVector visitOr(Or or) {
+ PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(or);
+ ColumnVector left = argResults.leftResult;
+ ColumnVector right = argResults.rightResult;
+ int numRows = argResults.rowCount;
+ boolean[] result = new boolean[numRows];
+ boolean[] nullability = new boolean[numRows];
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId);
+ boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId);
+ boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId);
+ boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId);
+
+ if (leftIsTrue || rightIsTrue) {
+ nullability[rowId] = false;
+ result[rowId] = true;
+ } else if (leftIsFalse && rightIsFalse) {
+ nullability[rowId] = false;
+ result[rowId] = false;
+ } else {
+ nullability[rowId] = true;
+ // result[rowId] is undefined when nullability[rowId] = true
+ }
+ }
+ return new DefaultBooleanVector(numRows, Optional.of(nullability), result);
+ }
+
+ @Override
+ ColumnVector visitAlwaysTrue(AlwaysTrue alwaysTrue) {
+ return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), true);
+ }
+
+ @Override
+ ColumnVector visitAlwaysFalse(AlwaysFalse alwaysFalse) {
+ return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), false);
+ }
+
+ @Override
+ ColumnVector visitComparator(Predicate predicate) {
+ PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(predicate);
+
+ int numRows = argResults.rowCount;
+ boolean[] result = new boolean[numRows];
+ boolean[] nullability = evalNullability(argResults.leftResult, argResults.rightResult);
+ int[] compareResult = compare(argResults.leftResult, argResults.rightResult);
+ switch (predicate.getName()) {
+ case "=":
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ result[rowId] = compareResult[rowId] == 0;
+ }
+ break;
+ case ">":
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ result[rowId] = compareResult[rowId] > 0;
+ }
+ break;
+ case ">=":
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ result[rowId] = compareResult[rowId] >= 0;
+ }
+ break;
+ case "<":
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ result[rowId] = compareResult[rowId] < 0;
+ }
+ break;
+ case "<=":
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ result[rowId] = compareResult[rowId] <= 0;
+ }
+ break;
+ default:
+ // We should never reach this based on the ExpressionVisitor
+ throw new IllegalStateException(
+ String.format("%s is not a recognized comparator", predicate.getName()));
+ }
+
+ return new DefaultBooleanVector(numRows, Optional.of(nullability), result);
+ }
+
+ @Override
+ ColumnVector visitLiteral(Literal literal) {
+ DataType dataType = literal.getDataType();
+ if (dataType instanceof BooleanType || dataType instanceof ByteType || dataType instanceof ShortType
+ || dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType
+ || dataType instanceof DoubleType || dataType instanceof StringType
+ || dataType instanceof BinaryType || dataType instanceof DecimalType || dataType instanceof DateType
+ || dataType instanceof TimestampType || dataType instanceof TimestampNTZType) {
+ return new DefaultConstantVector(dataType, input.getSize(), literal.getValue());
+ }
+
+ throw new UnsupportedOperationException("unsupported expression encountered: " + literal);
+ }
+
+ @Override
+ ColumnVector visitColumn(Column column) {
+ String[] names = column.getNames();
+ DataType currentType = input.getSchema();
+ ColumnVector columnVector = null;
+ for (int level = 0; level < names.length; level++) {
+ assertColumnExists(currentType instanceof StructType, input.getSchema(), column);
+ StructType structSchema = ((StructType) currentType);
+ int ordinal = structSchema.indexOf(names[level]);
+ assertColumnExists(ordinal != -1, input.getSchema(), column);
+ currentType = structSchema.at(ordinal).getDataType();
+
+ if (level == 0) {
+ columnVector = input.getColumnVector(ordinal);
+ } else {
+ columnVector = columnVector.getChild(ordinal);
+ }
+ }
+ assertColumnExists(columnVector != null, input.getSchema(), column);
+ return columnVector;
+ }
+
+ @Override
+ ColumnVector visitCast(ImplicitCastExpression cast) {
+ ColumnVector inputResult = visit(cast.getInput());
+ return cast.eval(inputResult);
+ }
+
+ @Override
+ ColumnVector visitPartitionValue(PartitionValueExpression partitionValue) {
+ ColumnVector input = visit(partitionValue.getInput());
+ return PartitionValueEvaluator.eval(input, partitionValue.getDataType());
+ }
+
+ @Override
+ ColumnVector visitElementAt(ScalarExpression elementAt) {
+ ColumnVector map = visit(childAt(elementAt, 0));
+ ColumnVector lookupKey = visit(childAt(elementAt, 1));
+ return ElementAtEvaluator.eval(map, lookupKey);
+ }
+
+ @Override
+ ColumnVector visitNot(Predicate predicate) {
+ ColumnVector childResult = visit(childAt(predicate, 0));
+ return booleanWrapperVector(childResult, rowId -> !childResult.getBoolean(rowId),
+ rowId -> childResult.isNullAt(rowId));
+ }
+
+ @Override
+ ColumnVector visitIsNotNull(Predicate predicate) {
+ ColumnVector childResult = visit(childAt(predicate, 0));
+ return booleanWrapperVector(childResult, rowId -> !childResult.isNullAt(rowId), rowId -> false);
+ }
+
+ @Override
+ ColumnVector visitIsNull(Predicate predicate) {
+ ColumnVector childResult = visit(getUnaryChild(predicate));
+ return booleanWrapperVector(childResult, rowId -> childResult.isNullAt(rowId), rowId -> false);
+ }
+
+ @Override
+ ColumnVector visitCoalesce(ScalarExpression coalesce) {
+ List<ColumnVector> childResults =
+ coalesce.getChildren().stream().map(this::visit).collect(Collectors.toList());
+ return DefaultExpressionUtils.combinationVector(childResults, rowId -> {
+ for (int idx = 0; idx < childResults.size(); idx++) {
+ if (!childResults.get(idx).isNullAt(rowId)) {
+ return idx;
+ }
+ }
+ return 0; // If all are null then any idx suffices
+ });
+ }
+
+ /**
+ * Utility method to evaluate inputs to the binary input expression. Also validates the
+ * evaluated expression result {@link ColumnVector}s are of the same size.
+ *
+ * @param predicate
+ * @return Triplet of (result vector size, left operand result, left operand result)
+ */
+ private PredicateChildrenEvalResult evalBinaryExpressionChildren(Predicate predicate) {
+ ColumnVector left = visit(getLeft(predicate));
+ ColumnVector right = visit(getRight(predicate));
+ checkArgument(left.getSize() == right.getSize(),
+ "Left and right operand returned different results: left=%d, right=d", left.getSize(),
+ right.getSize());
+ return new PredicateChildrenEvalResult(left.getSize(), left, right);
+ }
+ }
+
+ /**
+ * Encapsulates children expression result of binary input predicate
+ */
+ private static class PredicateChildrenEvalResult {
+ public final int rowCount;
+ public final ColumnVector leftResult;
+ public final ColumnVector rightResult;
+
+ PredicateChildrenEvalResult(int rowCount, ColumnVector leftResult, ColumnVector rightResult) {
+ this.rowCount = rowCount;
+ this.leftResult = leftResult;
+ this.rightResult = rightResult;
+ }
+ }
+
+ private static void assertColumnExists(boolean condition, StructType schema, Column column) {
+ if (!condition) {
+ throw new IllegalArgumentException(
+ String.format("%s doesn't exist in input data schema: %s", column, schema));
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
new file mode 100644
index 0000000..d9f5251
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import io.delta.kernel.defaults.engine.DefaultExpressionHandler;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.ExpressionEvaluator;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.PredicateEvaluator;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+public class DeltaExpressionHandler extends DefaultExpressionHandler {
+
+ @Override
+ public ExpressionEvaluator getEvaluator(StructType inputSchema, Expression expression, DataType outputType) {
+ return new DeltaExpressionEvaluator(inputSchema, expression, outputType);
+ }
+
+ @Override
+ public PredicateEvaluator getPredicateEvaluator(StructType inputSchema, Predicate predicate) {
+ return new DeltaPredicateEvaluator(inputSchema, predicate);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 121a76b..44dfdf0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -22,8 +22,14 @@
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -39,15 +45,40 @@
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import io.delta.kernel.Scan;
+import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.engine.ExpressionHandler;
+import io.delta.kernel.expressions.ExpressionEvaluator;
+import io.delta.kernel.expressions.Literal;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.internal.data.SelectionColumnVector;
+import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils;
+import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray;
+import io.delta.kernel.internal.util.ColumnMapping;
+import io.delta.kernel.internal.util.InternalUtils;
+import io.delta.kernel.internal.util.Tuple2;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
@@ -74,7 +105,7 @@
public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config,
String filterExpressionStr) throws HyracksDataException {
JobConf conf = config.getConf();
- this.engine = DefaultEngine.create(conf);
+ this.engine = DeltaEngine.create(conf);
this.scanFiles = new ArrayList<>();
for (String scanFile : serScanFiles) {
this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
@@ -94,7 +125,7 @@
try {
this.physicalDataIter = engine.getParquetHandler()
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate);
- this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ this.dataIter = transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
if (dataIter.hasNext()) {
rows = dataIter.next().getRows();
}
@@ -165,4 +196,182 @@
public boolean handleException(Throwable th) {
return false;
}
+
+ static CloseableIterator<FilteredColumnarBatch> transformPhysicalData(Engine engine, Row scanState, Row scanFile,
+ CloseableIterator<ColumnarBatch> physicalDataIter) throws IOException {
+ return new CloseableIterator<FilteredColumnarBatch>() {
+ boolean inited = false;
+
+ // initialized as part of init()
+ StructType physicalReadSchema = null;
+ StructType logicalReadSchema = null;
+ String tablePath = null;
+
+ RoaringBitmapArray currBitmap = null;
+ DeletionVectorDescriptor currDV = null;
+
+ private void initIfRequired() {
+ if (inited) {
+ return;
+ }
+ physicalReadSchema = ScanStateRow.getPhysicalSchema(engine, scanState);
+ logicalReadSchema = ScanStateRow.getLogicalSchema(engine, scanState);
+
+ tablePath = ScanStateRow.getTableRoot(scanState);
+ inited = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ physicalDataIter.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ initIfRequired();
+ return physicalDataIter.hasNext();
+ }
+
+ @Override
+ public FilteredColumnarBatch next() {
+ initIfRequired();
+ ColumnarBatch nextDataBatch = physicalDataIter.next();
+
+ DeletionVectorDescriptor dv = InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFile);
+
+ int rowIndexOrdinal = nextDataBatch.getSchema().indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME);
+
+ // Get the selectionVector if DV is present
+ Optional<ColumnVector> selectionVector;
+ if (dv == null) {
+ selectionVector = Optional.empty();
+ } else {
+ if (rowIndexOrdinal == -1) {
+ throw new IllegalArgumentException(
+ "Row index column is not " + "present in the data read from the Parquet file.");
+ }
+ if (!dv.equals(currDV)) {
+ Tuple2<DeletionVectorDescriptor, RoaringBitmapArray> dvInfo =
+ DeletionVectorUtils.loadNewDvAndBitmap(engine, tablePath, dv);
+ this.currDV = dvInfo._1;
+ this.currBitmap = dvInfo._2;
+ }
+ ColumnVector rowIndexVector = nextDataBatch.getColumnVector(rowIndexOrdinal);
+ selectionVector = Optional.of(new SelectionColumnVector(currBitmap, rowIndexVector));
+ }
+ if (rowIndexOrdinal != -1) {
+ nextDataBatch = nextDataBatch.withDeletedColumnAt(rowIndexOrdinal);
+ }
+
+ // Add partition columns
+ nextDataBatch = withPartitionColumns(engine.getExpressionHandler(), nextDataBatch,
+ InternalScanFileUtils.getPartitionValues(scanFile), physicalReadSchema);
+
+ // Change back to logical schema
+ String columnMappingMode = ScanStateRow.getColumnMappingMode(scanState);
+ switch (columnMappingMode) {
+ case ColumnMapping.COLUMN_MAPPING_MODE_NAME:
+ case ColumnMapping.COLUMN_MAPPING_MODE_ID:
+ nextDataBatch = nextDataBatch.withNewSchema(logicalReadSchema);
+ break;
+ case ColumnMapping.COLUMN_MAPPING_MODE_NONE:
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Column mapping mode is not yet supported: " + columnMappingMode);
+ }
+
+ return new FilteredColumnarBatch(nextDataBatch, selectionVector);
+ }
+ };
+ }
+
+ public static ColumnarBatch withPartitionColumns(ExpressionHandler expressionHandler, ColumnarBatch dataBatch,
+ Map<String, String> partitionValues, StructType schemaWithPartitionCols) {
+ if (partitionValues == null || partitionValues.size() == 0) {
+ // no partition column vectors to attach to.
+ return dataBatch;
+ }
+
+ for (int colIdx = 0; colIdx < schemaWithPartitionCols.length(); colIdx++) {
+ StructField structField = schemaWithPartitionCols.at(colIdx);
+
+ if (partitionValues.containsKey(structField.getName())) {
+ // Create a partition vector
+
+ ExpressionEvaluator evaluator = expressionHandler.getEvaluator(dataBatch.getSchema(),
+ literalForPartitionValue(structField.getDataType(), partitionValues.get(structField.getName())),
+ structField.getDataType());
+
+ ColumnVector partitionVector = evaluator.eval(dataBatch);
+ dataBatch = dataBatch.withNewColumn(colIdx, structField, partitionVector);
+ }
+ }
+
+ return dataBatch;
+ }
+
+ protected static Literal literalForPartitionValue(DataType dataType, String partitionValue) {
+ if (partitionValue == null) {
+ return Literal.ofNull(dataType);
+ }
+
+ if (dataType instanceof BooleanType) {
+ return Literal.ofBoolean(Boolean.parseBoolean(partitionValue));
+ }
+ if (dataType instanceof ByteType) {
+ return Literal.ofByte(Byte.parseByte(partitionValue));
+ }
+ if (dataType instanceof ShortType) {
+ return Literal.ofShort(Short.parseShort(partitionValue));
+ }
+ if (dataType instanceof IntegerType) {
+ return Literal.ofInt(Integer.parseInt(partitionValue));
+ }
+ if (dataType instanceof LongType) {
+ return Literal.ofLong(Long.parseLong(partitionValue));
+ }
+ if (dataType instanceof FloatType) {
+ return Literal.ofFloat(Float.parseFloat(partitionValue));
+ }
+ if (dataType instanceof DoubleType) {
+ return Literal.ofDouble(Double.parseDouble(partitionValue));
+ }
+ if (dataType instanceof StringType) {
+ return Literal.ofString(partitionValue);
+ }
+ if (dataType instanceof BinaryType) {
+ return Literal.ofBinary(partitionValue.getBytes());
+ }
+ if (dataType instanceof DateType) {
+ return Literal.ofDate(InternalUtils.daysSinceEpoch(Date.valueOf(partitionValue)));
+ }
+ if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dataType;
+ return Literal.ofDecimal(new BigDecimal(partitionValue), decimalType.getPrecision(),
+ decimalType.getScale());
+ }
+ if (dataType instanceof TimestampType) {
+ try {
+ Timestamp timestamp = Timestamp.valueOf(partitionValue);
+ return Literal.ofTimestamp(InternalUtils.microsSinceEpoch(timestamp));
+ } catch (IllegalArgumentException e) {
+ Instant instant = Instant.parse(partitionValue);
+ return Literal.ofTimestamp(ChronoUnit.MICROS.between(Instant.EPOCH, instant));
+ }
+ }
+ if (dataType instanceof TimestampNTZType) {
+ // Both the timestamp and timestamp_ntz have no timezone info, so they are interpreted
+ // in local time zone.
+ try {
+ Timestamp timestamp = Timestamp.valueOf(partitionValue);
+ return Literal.ofTimestampNtz(InternalUtils.microsSinceEpoch(timestamp));
+ } catch (IllegalArgumentException e) {
+ Instant instant = Instant.parse(partitionValue);
+ return Literal.ofTimestampNtz(ChronoUnit.MICROS.between(Instant.EPOCH, instant));
+ }
+ }
+
+ throw new UnsupportedOperationException("Unsupported partition column: " + dataType);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
new file mode 100644
index 0000000..494d3bc
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.util.Optional;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
+import io.delta.kernel.expressions.And;
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.ExpressionEvaluator;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.PredicateEvaluator;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+public class DeltaPredicateEvaluator implements PredicateEvaluator {
+
+ private final ExpressionEvaluator expressionEvaluator;
+ private static final String EXISTING_SEL_VECTOR_COL_NAME = "____existing_selection_vector_value____";
+ private static final StructField EXISTING_SEL_VECTOR_FIELD =
+ new StructField(EXISTING_SEL_VECTOR_COL_NAME, BooleanType.BOOLEAN, false);
+
+ public DeltaPredicateEvaluator(StructType inputSchema, Predicate predicate) {
+ Predicate rewrittenPredicate = new And(
+ new Predicate("=", new Column(EXISTING_SEL_VECTOR_COL_NAME), Literal.ofBoolean(true)), predicate);
+ StructType rewrittenInputSchema = inputSchema.add(EXISTING_SEL_VECTOR_FIELD);
+ this.expressionEvaluator =
+ new DeltaExpressionEvaluator(rewrittenInputSchema, rewrittenPredicate, BooleanType.BOOLEAN);
+ }
+
+ @Override
+ public ColumnVector eval(ColumnarBatch inputData, Optional<ColumnVector> existingSelectionVector) {
+ try {
+ ColumnVector newVector = existingSelectionVector
+ .orElse(new DefaultConstantVector(BooleanType.BOOLEAN, inputData.getSize(), true));
+ ColumnarBatch withExistingSelVector =
+ inputData.withNewColumn(inputData.getSchema().length(), EXISTING_SEL_VECTOR_FIELD, newVector);
+
+ return expressionEvaluator.eval(withExistingSelVector);
+ } finally {
+ // release the existing selection vector.
+ Utils.closeCloseables(existingSelectionVector.orElse(null));
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index b76dd4d..10a527e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -60,7 +60,6 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.KernelException;
@@ -104,7 +103,7 @@
configureJobConf(appCtx, conf, configuration);
confFactory = new ConfFactory(conf);
String tableMetadataPath = getTablePath(configuration);
- Engine engine = DefaultEngine.create(conf);
+ Engine engine = DeltaEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
Snapshot snapshot;
try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
new file mode 100644
index 0000000..388a287
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.defaults.internal.DefaultEngineErrors.unsupportedExpressionException;
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static org.apache.asterix.external.input.record.reader.aws.delta.ImplicitCastExpression.canCastTo;
+
+import java.util.Arrays;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.ScalarExpression;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.StringType;
+
+/**
+ * Utility methods to evaluate {@code element_at} expression.
+ */
+class ElementAtEvaluator {
+ private ElementAtEvaluator() {
+ }
+
+ /**
+ * Validate and transform the {@code element_at} expression with given validated and
+ * transformed inputs.
+ */
+ static ScalarExpression validateAndTransform(ScalarExpression elementAt, Expression mapInput, DataType mapInputType,
+ Expression lookupKey, DataType lookupKeyType) {
+
+ MapType asMapType = validateSupportedMapType(elementAt, mapInputType);
+ DataType keyTypeFromMapInput = asMapType.getKeyType();
+
+ if (!keyTypeFromMapInput.equivalent(lookupKeyType)) {
+ if (canCastTo(lookupKeyType, keyTypeFromMapInput)) {
+ lookupKey = new ImplicitCastExpression(lookupKey, keyTypeFromMapInput);
+ } else {
+ String reason = format("lookup key type (%s) is different from the map key type (%s)", lookupKeyType,
+ asMapType.getKeyType());
+ throw unsupportedExpressionException(elementAt, reason);
+ }
+ }
+ return new ScalarExpression(elementAt.getName(), Arrays.asList(mapInput, lookupKey));
+ }
+
+ /**
+ * Utility method to evaluate the {@code element_at} on given map and key vectors.
+ * @param map {@link ColumnVector} of {@code map(string, string)} type.
+ * @param lookupKey {@link ColumnVector} of {@code string} type.
+ * @return result {@link ColumnVector} containing the lookup values.
+ */
+ static ColumnVector eval(ColumnVector map, ColumnVector lookupKey) {
+ return new ColumnVector() {
+ // Store the last lookup value to avoid multiple looks up for same row id.
+ // The general pattern is call `isNullAt(rowId)` followed by `getString`.
+ // So the cache of one value is enough.
+ private int lastLookupRowId = -1;
+ private String lastLookupValue = null;
+
+ @Override
+ public DataType getDataType() {
+ return ((MapType) map.getDataType()).getValueType();
+ }
+
+ @Override
+ public int getSize() {
+ return map.getSize();
+ }
+
+ @Override
+ public void close() {
+ Utils.closeCloseables(map, lookupKey);
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ if (rowId == lastLookupRowId) {
+ return lastLookupValue == null;
+ }
+ return map.isNullAt(rowId) || lookupValue(rowId) == null;
+ }
+
+ @Override
+ public String getString(int rowId) {
+ lookupValue(rowId);
+ return lastLookupValue == null ? null : lastLookupValue;
+ }
+
+ private Object lookupValue(int rowId) {
+ if (rowId == lastLookupRowId) {
+ return lastLookupValue;
+ }
+ lastLookupRowId = rowId;
+ String keyValue = lookupKey.getString(rowId);
+ lastLookupValue = findValueForKey(map.getMap(rowId), keyValue);
+ return lastLookupValue;
+ }
+
+ /**
+ * Given a {@link MapValue} and string {@code key} find the corresponding value.
+ * Returns null if the key is not in the map.
+ * @param mapValue String->String map to search
+ * @param key the key to look up the value for; may be null
+ */
+ private String findValueForKey(MapValue mapValue, String key) {
+ ColumnVector keyVector = mapValue.getKeys();
+ for (int i = 0; i < mapValue.getSize(); i++) {
+ if ((keyVector.isNullAt(i) && key == null)
+ || (!keyVector.isNullAt(i) && keyVector.getString(i).equals(key))) {
+ return mapValue.getValues().isNullAt(i) ? null : mapValue.getValues().getString(i);
+ }
+ }
+ // If the key is not in the map return null
+ return null;
+ }
+ };
+ }
+
+ private static MapType validateSupportedMapType(Expression elementAt, DataType mapInputType) {
+ checkArgument(mapInputType instanceof MapType, "expected a map type input as first argument: " + elementAt);
+ MapType asMapType = (MapType) mapInputType;
+ // For now we only need to support lookup in columns of type `map(string -> string)`.
+ // Additional type support may be added later
+ if (asMapType.getKeyType().equivalent(StringType.STRING)
+ && asMapType.getValueType().equivalent(StringType.STRING)) {
+ return asMapType;
+ }
+ throw new UnsupportedOperationException(
+ format("%s: Supported only on type map(string, string) input data", elementAt));
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
new file mode 100644
index 0000000..7e2a65b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;
+import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;
+import static java.util.stream.Collectors.joining;
+
+import java.util.List;
+import java.util.Locale;
+
+import io.delta.kernel.expressions.AlwaysFalse;
+import io.delta.kernel.expressions.AlwaysTrue;
+import io.delta.kernel.expressions.And;
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Or;
+import io.delta.kernel.expressions.PartitionValueExpression;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.ScalarExpression;
+
+/**
+ * Interface to allow visiting an expression tree and implementing handling for each
+ * specific expression type.
+ *
+ * @param <R> Return type of result of visit expression methods.
+ */
+abstract class ExpressionVisitor<R> {
+
+ abstract R visitAnd(And and);
+
+ abstract R visitOr(Or or);
+
+ abstract R visitAlwaysTrue(AlwaysTrue alwaysTrue);
+
+ abstract R visitAlwaysFalse(AlwaysFalse alwaysFalse);
+
+ abstract R visitComparator(Predicate predicate);
+
+ abstract R visitLiteral(Literal literal);
+
+ abstract R visitColumn(Column column);
+
+ abstract R visitCast(ImplicitCastExpression cast);
+
+ abstract R visitPartitionValue(PartitionValueExpression partitionValue);
+
+ abstract R visitElementAt(ScalarExpression elementAt);
+
+ abstract R visitNot(Predicate predicate);
+
+ abstract R visitIsNotNull(Predicate predicate);
+
+ abstract R visitIsNull(Predicate predicate);
+
+ abstract R visitCoalesce(ScalarExpression ifNull);
+
+ final R visit(Expression expression) {
+ if (expression instanceof PartitionValueExpression) {
+ return visitPartitionValue((PartitionValueExpression) expression);
+ } else if (expression instanceof ScalarExpression) {
+ return visitScalarExpression((ScalarExpression) expression);
+ } else if (expression instanceof Literal) {
+ return visitLiteral((Literal) expression);
+ } else if (expression instanceof Column) {
+ return visitColumn((Column) expression);
+ } else if (expression instanceof ImplicitCastExpression) {
+ return visitCast((ImplicitCastExpression) expression);
+ }
+
+ throw new UnsupportedOperationException(String.format("Expression %s is not supported.", expression));
+ }
+
+ private R visitScalarExpression(ScalarExpression expression) {
+ List<Expression> children = expression.getChildren();
+ String name = expression.getName().toUpperCase(Locale.ENGLISH);
+ switch (name) {
+ case "ALWAYS_TRUE":
+ return visitAlwaysTrue(ALWAYS_TRUE);
+ case "ALWAYS_FALSE":
+ return visitAlwaysFalse(ALWAYS_FALSE);
+ case "AND":
+ return visitAnd(new And(elemAsPredicate(children, 0), elemAsPredicate(children, 1)));
+ case "OR":
+ return visitOr(new Or(elemAsPredicate(children, 0), elemAsPredicate(children, 1)));
+ case "=":
+ case "<":
+ case "<=":
+ case ">":
+ case ">=":
+ return visitComparator(new Predicate(name, children));
+ case "ELEMENT_AT":
+ return visitElementAt(expression);
+ case "NOT":
+ return visitNot(new Predicate(name, children));
+ case "IS_NOT_NULL":
+ return visitIsNotNull(new Predicate(name, children));
+ case "IS_NULL":
+ return visitIsNull(new Predicate(name, children));
+ case "COALESCE":
+ return visitCoalesce(expression);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Scalar expression `%s` is not supported.", name));
+ }
+ }
+
+ private static Predicate elemAsPredicate(List<Expression> expressions, int index) {
+ if (expressions.size() <= index) {
+ throw new RuntimeException(String.format("Trying to access invalid entry (%d) in list %s", index,
+ expressions.stream().map(Object::toString).collect(joining(","))));
+ }
+ Expression elemExpression = expressions.get(index);
+ if (!(elemExpression instanceof Predicate)) {
+ throw new RuntimeException("Expected a predicate, but got " + elemExpression);
+ }
+ return (Predicate) expressions.get(index);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
new file mode 100644
index 0000000..febd4d3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static java.lang.String.format;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.types.DataType;
+
+/**
+ * An implicit cast expression to convert the input type to another given type. Here is the valid
+ * list of casts
+ * <p>
+ * <ul>
+ * <li>{@code byte} to {@code short, int, long, float, double}</li>
+ * <li>{@code short} to {@code int, long, float, double}</li>
+ * <li>{@code int} to {@code long, float, double}</li>
+ * <li>{@code long} to {@code float, double}</li>
+ * <li>{@code float} to {@code double}</li>
+ * </ul>
+ *
+ * <p>
+ * The above list is not exhaustive. Based on the need, we can add more casts.
+ * <p>
+ */
+final class ImplicitCastExpression implements Expression {
+ private final Expression input;
+ private final DataType outputType;
+
+ /**
+ * Create a cast around the given input expression to specified output data
+ * type. It is the responsibility of the caller to validate the input expression can be cast
+ * to the new type using {@link #canCastTo(DataType, DataType)}
+ */
+ ImplicitCastExpression(Expression input, DataType outputType) {
+ this.input = requireNonNull(input, "input is null");
+ this.outputType = requireNonNull(outputType, "outputType is null");
+ }
+
+ public Expression getInput() {
+ return input;
+ }
+
+ public DataType getOutputType() {
+ return outputType;
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.singletonList(input);
+ }
+
+ /**
+ * Evaluate the given column expression on the input {@link ColumnVector}.
+ *
+ * @param input {@link ColumnVector} data of the input to the cast expression.
+ * @return {@link ColumnVector} result applying target type casting on every element in the
+ * input {@link ColumnVector}.
+ */
+ ColumnVector eval(ColumnVector input) {
+ String fromTypeStr = input.getDataType().toString();
+ switch (fromTypeStr) {
+ case "byte":
+ return new ByteUpConverter(outputType, input);
+ case "short":
+ return new ShortUpConverter(outputType, input);
+ case "integer":
+ return new IntUpConverter(outputType, input);
+ case "long":
+ return new LongUpConverter(outputType, input);
+ case "float":
+ return new FloatUpConverter(outputType, input);
+ default:
+ throw new UnsupportedOperationException(format("Cast from %s is not supported", fromTypeStr));
+ }
+ }
+
+ /**
+ * Map containing for each type what are the target cast types can be.
+ */
+ private static final Map<String, List<String>> UP_CASTABLE_TYPE_TABLE;
+
+ static {
+ Map<String, List<String>> map = new HashMap<>();
+ map.put("byte", Arrays.asList("short", "integer", "long", "float", "double"));
+ map.put("short", Arrays.asList("integer", "long", "float", "double"));
+ map.put("integer", Arrays.asList("long", "float", "double"));
+ map.put("long", Arrays.asList("float", "double"));
+ map.put("float", Arrays.asList("double"));
+ UP_CASTABLE_TYPE_TABLE = unmodifiableMap(map);
+ }
+
+ /**
+ * Utility method which returns whether the given {@code from} type can be cast to {@code to}
+ * type.
+ */
+ static boolean canCastTo(DataType from, DataType to) {
+ // TODO: The type name should be a first class method on `DataType` instead of getting it
+ // using the `toString`.
+ String fromStr = from.toString();
+ String toStr = to.toString();
+ return UP_CASTABLE_TYPE_TABLE.containsKey(fromStr) && UP_CASTABLE_TYPE_TABLE.get(fromStr).contains(toStr);
+ }
+
+ /**
+ * Base class for up casting {@link ColumnVector} data.
+ */
+ private abstract static class UpConverter implements ColumnVector {
+ protected final DataType targetType;
+ protected final ColumnVector inputVector;
+
+ UpConverter(DataType targetType, ColumnVector inputVector) {
+ this.targetType = targetType;
+ this.inputVector = inputVector;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return targetType;
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return inputVector.isNullAt(rowId);
+ }
+
+ @Override
+ public int getSize() {
+ return inputVector.getSize();
+ }
+
+ @Override
+ public void close() {
+ inputVector.close();
+ }
+ }
+
+ private static class ByteUpConverter extends UpConverter {
+ ByteUpConverter(DataType targetType, ColumnVector inputVector) {
+ super(targetType, inputVector);
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ return inputVector.getByte(rowId);
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return inputVector.getByte(rowId);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ return inputVector.getByte(rowId);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return inputVector.getByte(rowId);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return inputVector.getByte(rowId);
+ }
+ }
+
+ private static class ShortUpConverter extends UpConverter {
+ ShortUpConverter(DataType targetType, ColumnVector inputVector) {
+ super(targetType, inputVector);
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return inputVector.getShort(rowId);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ return inputVector.getShort(rowId);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return inputVector.getShort(rowId);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return inputVector.getShort(rowId);
+ }
+ }
+
+ private static class IntUpConverter extends UpConverter {
+ IntUpConverter(DataType targetType, ColumnVector inputVector) {
+ super(targetType, inputVector);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ return inputVector.getInt(rowId);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return inputVector.getInt(rowId);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return inputVector.getInt(rowId);
+ }
+ }
+
+ private static class LongUpConverter extends UpConverter {
+ LongUpConverter(DataType targetType, ColumnVector inputVector) {
+ super(targetType, inputVector);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return inputVector.getLong(rowId);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return inputVector.getLong(rowId);
+ }
+ }
+
+ private static class FloatUpConverter extends UpConverter {
+ FloatUpConverter(DataType targetType, ColumnVector inputVector) {
+ super(targetType, inputVector);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return inputVector.getFloat(rowId);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
new file mode 100644
index 0000000..c24b225
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.internal.util.InternalUtils;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility methods to evaluate {@code partition_value} expression
+ */
+class PartitionValueEvaluator {
+ /**
+ * Evaluate the {@code partition_value} expression for given input column vector and generate
+ * a column vector with decoded values according to the given partition type.
+ */
+ static ColumnVector eval(ColumnVector input, DataType partitionType) {
+ return new ColumnVector() {
+ @Override
+ public DataType getDataType() {
+ return partitionType;
+ }
+
+ @Override
+ public int getSize() {
+ return input.getSize();
+ }
+
+ @Override
+ public void close() {
+ input.close();
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return input.isNullAt(rowId);
+ }
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ return Boolean.parseBoolean(input.getString(rowId));
+ }
+
+ @Override
+ public byte getByte(int rowId) {
+ return Byte.parseByte(input.getString(rowId));
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ return Short.parseShort(input.getString(rowId));
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ if (partitionType.equivalent(IntegerType.INTEGER)) {
+ return Integer.parseInt(input.getString(rowId));
+ } else if (partitionType.equivalent(DateType.DATE)) {
+ return InternalUtils.daysSinceEpoch(Date.valueOf(input.getString(rowId)));
+ }
+ throw new UnsupportedOperationException("Invalid value request for data type");
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ if (partitionType.equivalent(LongType.LONG)) {
+ return Long.parseLong(input.getString(rowId));
+ } else if (partitionType.equivalent(TimestampType.TIMESTAMP)
+ || partitionType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
+ // Both the timestamp and timestamp_ntz have no timezone info,
+ // so they are interpreted in local time zone.
+ try {
+ Timestamp timestamp = Timestamp.valueOf(input.getString(rowId));
+ return InternalUtils.microsSinceEpoch(timestamp);
+ } catch (IllegalArgumentException e) {
+ Instant instant = Instant.parse(input.getString(rowId));
+ return ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+ }
+ }
+ throw new UnsupportedOperationException("Invalid value request for data type");
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return Float.parseFloat(input.getString(rowId));
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return Double.parseDouble(input.getString(rowId));
+ }
+
+ @Override
+ public byte[] getBinary(int rowId) {
+ return input.isNullAt(rowId) ? null : input.getString(rowId).getBytes();
+ }
+
+ @Override
+ public String getString(int rowId) {
+ return input.getString(rowId);
+ }
+
+ @Override
+ public BigDecimal getDecimal(int rowId) {
+ return input.isNullAt(rowId) ? null : new BigDecimal(input.getString(rowId));
+ }
+ };
+ }
+}