[ASTERIXDB-3154] Adds apache iceberg table read support
- user model changes: yes
- Introduced two new DDL parameters “table-format” and “metadata-path” specific to iceberg
- storage format changes: no
- interface changes: no
Details:
These changes add support in AsterixDB to read apache iceberg tables (version 1) as external data.
Iceberg table details can be specified in the external dataset DDL and all queries fetch the data from the
latest iceberg snapshot.
APE link: https://cwiki.apache.org/confluence/display/ASTERIXDB/APE+1%3A+Iceberg+API+Integration
* External Adapters modified: AWS S3 and Hadoop
* Removed kite-sdk dependency and extracted the schema inferrence utilities from kite-sdk
Change-Id: I12df589a6dffdc5af4a5cace68a11729995ea9af
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17419
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index baa3be1..a3518852 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -878,11 +878,65 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
- <!-- Needed for inferring the schema for writing Parquet files -->
<dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>1.1.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>1.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-parquet</artifactId>
+ <version>1.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>1.12.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>tech.allegro.schema.json2avro</groupId>
+ <artifactId>converter</artifactId>
+ <version>0.2.15</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
index 6c07fab..c3f22a4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
@@ -47,6 +47,7 @@
PREPARE_FIXED_DATA_BUCKET = AwsS3ExternalDatasetOnePartitionTest::prepareFixedDataBucket;
PREPARE_MIXED_DATA_BUCKET = AwsS3ExternalDatasetOnePartitionTest::prepareMixedDataBucket;
PREPARE_BOM_FILE_BUCKET = AwsS3ExternalDatasetOnePartitionTest::prepareBomDataBucket;
+ PREPARE_ICEBERG_TABLE_BUCKET = AwsS3ExternalDatasetOnePartitionTest::prepareIcebergTableBucket;
return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
}
@@ -61,4 +62,7 @@
private static void prepareBomDataBucket() {
}
+
+ private static void prepareIcebergTableBucket() {
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 785e676..ed167e6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -23,6 +23,8 @@
import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
import static org.apache.hyracks.util.file.FileUtil.joinPath;
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+import static org.apache.iceberg.types.Types.NestedField.required;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -41,6 +43,8 @@
import java.util.zip.GZIPOutputStream;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.test.common.TestConstants;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -50,7 +54,27 @@
import org.apache.asterix.testframework.xml.TestCase;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.Tables;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
@@ -91,6 +115,8 @@
static Runnable PREPARE_MIXED_DATA_BUCKET;
static Runnable PREPARE_BOM_FILE_BUCKET;
+ static Runnable PREPARE_ICEBERG_TABLE_BUCKET;
+
// Base directory paths for data files
private static final String JSON_DATA_PATH = joinPath("data", "json");
private static final String CSV_DATA_PATH = joinPath("data", "csv");
@@ -117,6 +143,8 @@
public static final String FIXED_DATA_CONTAINER = "fixed-data"; // Do not use, has fixed data
public static final String INCLUDE_EXCLUDE_CONTAINER = "include-exclude";
public static final String BOM_FILE_CONTAINER = "bom-file-container";
+ public static final String ICEBERG_TABLE_CONTAINER = "iceberg-container";
+
public static final PutObjectRequest.Builder playgroundBuilder =
PutObjectRequest.builder().bucket(PLAYGROUND_CONTAINER);
public static final PutObjectRequest.Builder fixedDataBuilder =
@@ -126,10 +154,33 @@
public static final PutObjectRequest.Builder bomFileContainerBuilder =
PutObjectRequest.builder().bucket(BOM_FILE_CONTAINER);
+ public static final PutObjectRequest.Builder icebergContainerBuilder =
+ PutObjectRequest.builder().bucket(ICEBERG_TABLE_CONTAINER);
+
public AwsS3ExternalDatasetTest(TestCaseContext tcCtx) {
this.tcCtx = tcCtx;
}
+ // iceberg
+
+ private static final Schema SCHEMA =
+ new Schema(required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get()));
+ private static final Configuration CONF = new Configuration();
+
+ private static final String ICEBERG_TABLE_PATH = "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table/";
+ private static final String ICEBERG_TABLE_PATH_FORMAT_VERSION_2 =
+ "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table-format-version-2/";
+ private static final String ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT =
+ "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table-mixed-data-format/";
+
+ private static final String ICEBERG_TABLE_PATH_EMPTY = "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table-empty/";
+
+ private static final String ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES =
+ "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table-multiple-data-files/";
+
+ private static final String ICEBERG_TABLE_PATH_MODIFIED_DATA =
+ "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table-modified-data/";
+
@BeforeClass
public static void setUp() throws Exception {
final TestExecutor testExecutor = new AwsTestExecutor();
@@ -154,6 +205,138 @@
LOGGER.info("S3 mock down and client shut down successfully");
}
+ private static DataFile writeFile(String filename, List<Record> records, String location) throws IOException {
+ Path path = new Path(location, filename);
+ FileFormat fileFormat = FileFormat.fromFileName(filename);
+ Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
+
+ FileAppender<Record> fileAppender = new GenericAppenderFactory(AwsS3ExternalDatasetTest.SCHEMA)
+ .newAppender(fromPath(path, CONF), fileFormat);
+ try (FileAppender<Record> appender = fileAppender) {
+ appender.addAll(records);
+ }
+
+ return DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path, CONF))
+ .withMetrics(fileAppender.metrics()).build();
+ }
+
+ private static void prepareIcebergConfiguration() {
+ CONF.set(S3Constants.HADOOP_SERVICE_END_POINT, MOCK_SERVER_HOSTNAME);
+ // switch to http
+ CONF.set("fs.s3a.connection.ssl.enabled", "false");
+ // forces URL style access which is required by the mock. Overwrites DNS based bucket access scheme.
+ CONF.set("fs.s3a.path.style.access", "true");
+ // Mock server doesn't support concurrency control
+ CONF.set("fs.s3a.change.detection.version.required", "false");
+ CONF.set(S3Constants.HADOOP_ACCESS_KEY_ID, TestConstants.S3_ACCESS_KEY_ID_DEFAULT);
+ CONF.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, TestConstants.S3_SECRET_ACCESS_KEY_DEFAULT);
+ }
+
+ public static void prepareIcebergTableContainer() {
+ prepareIcebergConfiguration();
+ Tables tables = new HadoopTables(CONF);
+
+ // test data
+ Record genericRecord = GenericRecord.create(SCHEMA);
+
+ List<Record> fileFirstSnapshotRecords =
+ ImmutableList.of(genericRecord.copy(ImmutableMap.of("id", 0, "data", "vibrant_mclean")),
+ genericRecord.copy(ImmutableMap.of("id", 1, "data", "frosty_wilson")),
+ genericRecord.copy(ImmutableMap.of("id", 2, "data", "serene_kirby")));
+
+ List<Record> fileSecondSnapshotRecords =
+ ImmutableList.of(genericRecord.copy(ImmutableMap.of("id", 3, "data", "peaceful_pare")),
+ genericRecord.copy(ImmutableMap.of("id", 4, "data", "laughing_mahavira")),
+ genericRecord.copy(ImmutableMap.of("id", 5, "data", "vibrant_lamport")));
+
+ // create the table
+ Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), ICEBERG_TABLE_PATH);
+
+ // load test data
+ try {
+ DataFile file = writeFile(FileFormat.PARQUET.addExtension("file"), fileFirstSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH);
+ table.newAppend().appendFile(file).commit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // create a table with unsupported iceberg version
+ Table unsupportedTable = tables.create(SCHEMA,
+ PartitionSpec.unpartitioned(), ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
+ FileFormat.PARQUET.name(), TableProperties.FORMAT_VERSION, "2"),
+ ICEBERG_TABLE_PATH_FORMAT_VERSION_2);
+
+ // load test data
+ try {
+ DataFile file = writeFile(FileFormat.PARQUET.addExtension("file"), fileFirstSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_FORMAT_VERSION_2);
+ unsupportedTable.newAppend().appendFile(file).commit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // create a table with mix of parquet and avro data files
+ Table mixedDataFormats = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()),
+ ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT);
+
+ // load test data
+ try {
+ DataFile parquetFile = writeFile(FileFormat.PARQUET.addExtension("parquet-file"), fileFirstSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT);
+ DataFile avroFile = writeFile(FileFormat.AVRO.addExtension("avro-file"), fileSecondSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT);
+
+ mixedDataFormats.newAppend().appendFile(parquetFile).appendFile(avroFile).commit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // empty table
+ tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()),
+ ICEBERG_TABLE_PATH_EMPTY);
+
+ // multiple data files
+
+ Table multipleDataFiles = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()),
+ ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES);
+
+ // load test data
+ try {
+ DataFile file1 = writeFile(FileFormat.PARQUET.addExtension("file-1"), fileFirstSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES);
+ DataFile file2 = writeFile(FileFormat.PARQUET.addExtension("file-2"), fileSecondSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES);
+
+ multipleDataFiles.newAppend().appendFile(file1).appendFile(file2).commit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // modify data
+ Table modifiedData = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()),
+ ICEBERG_TABLE_PATH_MODIFIED_DATA);
+
+ // load test data
+ try {
+ DataFile file1 = writeFile(FileFormat.PARQUET.addExtension("file-1"), fileFirstSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_MODIFIED_DATA);
+ DataFile file2 = writeFile(FileFormat.PARQUET.addExtension("file-2"), fileSecondSnapshotRecords,
+ AwsS3ExternalDatasetTest.ICEBERG_TABLE_PATH_MODIFIED_DATA);
+
+ modifiedData.newAppend().appendFile(file1).appendFile(file2).commit();
+ modifiedData.newDelete().deleteFile(file1).commit();
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Parameters(name = "AwsS3ExternalDatasetTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
SUITE_TESTS = "testsuite_external_dataset_s3.xml";
@@ -163,6 +346,7 @@
PREPARE_FIXED_DATA_BUCKET = ExternalDatasetTestUtils::prepareFixedDataContainer;
PREPARE_MIXED_DATA_BUCKET = ExternalDatasetTestUtils::prepareMixedDataContainer;
PREPARE_BOM_FILE_BUCKET = ExternalDatasetTestUtils::prepareBomFileContainer;
+ PREPARE_ICEBERG_TABLE_BUCKET = AwsS3ExternalDatasetTest::prepareIcebergTableContainer;
return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
}
@@ -206,6 +390,7 @@
client.createBucket(CreateBucketRequest.builder().bucket(FIXED_DATA_CONTAINER).build());
client.createBucket(CreateBucketRequest.builder().bucket(INCLUDE_EXCLUDE_CONTAINER).build());
client.createBucket(CreateBucketRequest.builder().bucket(BOM_FILE_CONTAINER).build());
+ client.createBucket(CreateBucketRequest.builder().bucket(ICEBERG_TABLE_CONTAINER).build());
LOGGER.info("Client created successfully");
// Create the bucket and upload some json files
@@ -216,6 +401,7 @@
PREPARE_FIXED_DATA_BUCKET.run();
PREPARE_MIXED_DATA_BUCKET.run();
PREPARE_BOM_FILE_BUCKET.run();
+ PREPARE_ICEBERG_TABLE_BUCKET.run();
}
private static void loadPlaygroundData(String key, String content, boolean fromFile, boolean gzipped) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java
index 96a8703..769480f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/BinaryFileConverterUtil.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.test.external_dataset.parquet;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -28,10 +30,9 @@
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.fs.Path;
import org.apache.hyracks.api.util.IoUtil;
-import org.kitesdk.data.spi.JsonUtil;
-import org.kitesdk.data.spi.filesystem.JSONFileReader;
+import org.apache.parquet.avro.AvroParquetWriter;
-import parquet.avro.AvroParquetWriter;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
public class BinaryFileConverterUtil {
public static final String DEFAULT_PARQUET_SRC_PATH = "data/hdfs/parquet";
@@ -70,15 +71,15 @@
private static void writeParquetFile(File jsonInputPath, Path parquetOutputPath) throws IOException {
FileInputStream schemaInputStream = new FileInputStream(jsonInputPath);
- FileInputStream jsonInputStream = new FileInputStream(jsonInputPath);
//Infer Avro schema
Schema inputSchema = JsonUtil.inferSchema(schemaInputStream, "parquet_schema", NUM_OF_RECORDS_SCHEMA);
- try (JSONFileReader<Record> reader = new JSONFileReader<>(jsonInputStream, inputSchema, Record.class)) {
- reader.initialize();
- try (AvroParquetWriter<Record> writer = new AvroParquetWriter<>(parquetOutputPath, inputSchema)) {
- for (Record record : reader) {
- writer.write(record);
- }
+ try (BufferedReader reader = new BufferedReader(new FileReader(jsonInputPath));
+ AvroParquetWriter<Record> writer = new AvroParquetWriter<>(parquetOutputPath, inputSchema)) {
+ JsonAvroConverter converter = new JsonAvroConverter();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ Record record = converter.convertToGenericDataRecord(line.getBytes(), inputSchema);
+ writer.write(record);
}
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/JsonUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/JsonUtil.java
new file mode 100644
index 0000000..69f9f58
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/JsonUtil.java
@@ -0,0 +1,614 @@
+/*
+ * Copyright 2013 Cloudera Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.test.external_dataset.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BinaryNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.NumericNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/* *
+
+Copied from kite-sdk and modified to work with latest apache-avro version
+
+ * */
+public class JsonUtil {
+
+ private static final JsonFactory FACTORY = new JsonFactory();
+
+ public static Iterator<JsonNode> parser(final InputStream stream) {
+ try {
+ JsonParser parser = FACTORY.createParser(stream);
+ parser.setCodec(new ObjectMapper());
+ return parser.readValuesAs(JsonNode.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot read from stream", e);
+ }
+ }
+
+ public static JsonNode parse(String json) {
+ return parse(json, JsonNode.class);
+ }
+
+ public static <T> T parse(String json, Class<T> returnType) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.readValue(json, returnType);
+ } catch (JsonParseException | JsonMappingException e) {
+ throw new RuntimeException("Invalid JSON", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize JSON parser", e);
+ }
+ }
+
+ public static JsonNode parse(File file) {
+ return parse(file, JsonNode.class);
+ }
+
+ public static <T> T parse(File file, Class<T> returnType) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.readValue(file, returnType);
+ } catch (JsonParseException | JsonMappingException e) {
+ throw new RuntimeException("Invalid JSON", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize JSON parser", e);
+ }
+ }
+
+ public static JsonNode parse(InputStream in) {
+ return parse(in, JsonNode.class);
+ }
+
+ public static <T> T parse(InputStream in, Class<T> returnType) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.readValue(in, returnType);
+ } catch (JsonParseException | JsonMappingException e) {
+ throw new RuntimeException("Invalid JSON", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize JSON parser", e);
+ }
+ }
+
+ public abstract static class JsonTreeVisitor<T> {
+ protected LinkedList<String> recordLevels = Lists.newLinkedList();
+
+ public T object(ObjectNode object, Map<String, T> fields) {
+ return null;
+ }
+
+ public T array(ArrayNode array, List<T> elements) {
+ return null;
+ }
+
+ public T binary(BinaryNode binary) {
+ return null;
+ }
+
+ public T text(TextNode text) {
+ return null;
+ }
+
+ public T number(NumericNode number) {
+ return null;
+ }
+
+ public T bool(BooleanNode bool) {
+ return null;
+ }
+
+ public T missing(MissingNode missing) {
+ return null;
+ }
+
+ public T nullNode(NullNode nullNode) {
+ return null;
+ }
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST", justification = "Uses precondition to validate casts")
+ public static <T> T visit(JsonNode node, JsonTreeVisitor<T> visitor) {
+ switch (node.getNodeType()) {
+ case OBJECT:
+ Preconditions.checkArgument(node instanceof ObjectNode, "Expected instance of ObjectNode: " + node);
+
+ // use LinkedHashMap to preserve field order
+ Map<String, T> fields = Maps.newLinkedHashMap();
+
+ Iterator<Map.Entry<String, JsonNode>> iter = node.fields();
+ while (iter.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iter.next();
+
+ visitor.recordLevels.push(entry.getKey());
+ fields.put(entry.getKey(), visit(entry.getValue(), visitor));
+ visitor.recordLevels.pop();
+ }
+
+ return visitor.object((ObjectNode) node, fields);
+
+ case ARRAY:
+ Preconditions.checkArgument(node instanceof ArrayNode, "Expected instance of ArrayNode: " + node);
+
+ List<T> elements = Lists.newArrayListWithExpectedSize(node.size());
+
+ for (JsonNode element : node) {
+ elements.add(visit(element, visitor));
+ }
+
+ return visitor.array((ArrayNode) node, elements);
+
+ case BINARY:
+ Preconditions.checkArgument(node instanceof BinaryNode, "Expected instance of BinaryNode: " + node);
+ return visitor.binary((BinaryNode) node);
+
+ case STRING:
+ Preconditions.checkArgument(node instanceof TextNode, "Expected instance of TextNode: " + node);
+
+ return visitor.text((TextNode) node);
+
+ case NUMBER:
+ Preconditions.checkArgument(node instanceof NumericNode, "Expected instance of NumericNode: " + node);
+
+ return visitor.number((NumericNode) node);
+
+ case BOOLEAN:
+ Preconditions.checkArgument(node instanceof BooleanNode, "Expected instance of BooleanNode: " + node);
+
+ return visitor.bool((BooleanNode) node);
+
+ case MISSING:
+ Preconditions.checkArgument(node instanceof MissingNode, "Expected instance of MissingNode: " + node);
+
+ return visitor.missing((MissingNode) node);
+
+ case NULL:
+ Preconditions.checkArgument(node instanceof NullNode, "Expected instance of NullNode: " + node);
+
+ return visitor.nullNode((NullNode) node);
+
+ default:
+ throw new IllegalArgumentException("Unknown node type: " + node.getNodeType() + ": " + node);
+ }
+ }
+
+ /**
+ * Precondition-style validation that throws a {@link RuntimeException}.
+ *
+ * @param isValid
+ * {@code true} if valid, {@code false} if an exception should be thrown
+ * @param message
+ * A String message for the exception.
+ */
+ public static void check(boolean isValid, String message, Object... args) {
+ if (!isValid) {
+ String[] argStrings = new String[args.length];
+ for (int i = 0; i < args.length; i += 1) {
+ argStrings[i] = String.valueOf(args[i]);
+ }
+ throw new RuntimeException(String.format(String.valueOf(message), (Object[]) argStrings));
+ }
+ }
+
+ public static Object convertToAvro(GenericData model, JsonNode datum, Schema schema) {
+ if (datum == null) {
+ return null;
+ }
+ switch (schema.getType()) {
+ case RECORD:
+ check(datum.isObject(), "Cannot convert non-object to record: %s", datum);
+ Object record = model.newRecord(null, schema);
+ for (Schema.Field field : schema.getFields()) {
+ model.setField(record, field.name(), field.pos(),
+ convertField(model, datum.get(field.name()), field));
+ }
+ return record;
+
+ case MAP:
+ check(datum.isObject(), "Cannot convert non-object to map: %s", datum);
+ Map<String, Object> map = Maps.newLinkedHashMap();
+ Iterator<Map.Entry<String, JsonNode>> iter = datum.fields();
+ while (iter.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iter.next();
+ map.put(entry.getKey(), convertToAvro(model, entry.getValue(), schema.getValueType()));
+ }
+ return map;
+
+ case ARRAY:
+ check(datum.isArray(), "Cannot convert to array: %s", datum);
+ List<Object> list = Lists.newArrayListWithExpectedSize(datum.size());
+ for (JsonNode element : datum) {
+ list.add(convertToAvro(model, element, schema.getElementType()));
+ }
+ return list;
+
+ case UNION:
+ return convertToAvro(model, datum, resolveUnion(datum, schema.getTypes()));
+
+ case BOOLEAN:
+ check(datum.isBoolean(), "Cannot convert to boolean: %s", datum);
+ return datum.booleanValue();
+
+ case FLOAT:
+ check(datum.isFloat() || datum.isInt(), "Cannot convert to float: %s", datum);
+ return datum.floatValue();
+
+ case DOUBLE:
+ check(datum.isDouble() || datum.isFloat() || datum.isLong() || datum.isInt(),
+ "Cannot convert to double: %s", datum);
+ return datum.doubleValue();
+
+ case INT:
+ check(datum.isInt(), "Cannot convert to int: %s", datum);
+ return datum.intValue();
+
+ case LONG:
+ check(datum.isLong() || datum.isInt(), "Cannot convert to long: %s", datum);
+ return datum.longValue();
+
+ case STRING:
+ check(datum.isTextual(), "Cannot convert to string: %s", datum);
+ return datum.textValue();
+
+ case ENUM:
+ check(datum.isTextual(), "Cannot convert to string: %s", datum);
+ return model.createEnum(datum.textValue(), schema);
+
+ case BYTES:
+ check(datum.isBinary(), "Cannot convert to binary: %s", datum);
+ try {
+ return ByteBuffer.wrap(datum.binaryValue());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read JSON binary", e);
+ }
+
+ case FIXED:
+ check(datum.isBinary(), "Cannot convert to fixed: %s", datum);
+ byte[] bytes;
+ try {
+ bytes = datum.binaryValue();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read JSON binary", e);
+ }
+ check(bytes.length < schema.getFixedSize(), "Binary data is too short: %s bytes for %s", bytes.length,
+ schema);
+ return model.createFixed(null, bytes, schema);
+
+ case NULL:
+ return null;
+
+ default:
+ // don't use DatasetRecordException because this is a Schema problem
+ throw new IllegalArgumentException("Unknown schema type: " + schema);
+ }
+ }
+
+ private static Object convertField(GenericData model, JsonNode datum, Schema.Field field) {
+ try {
+ Object value = convertToAvro(model, datum, field.schema());
+ if (value != null || SchemaUtil.nullOk(field.schema())) {
+ return value;
+ } else {
+ return model.getDefaultValue(field);
+ }
+ } catch (AvroRuntimeException e) {
+ throw new RuntimeException(
+ String.format("Field %s: cannot make %s value: '%s'", field.name(), field.schema(), datum), e);
+ } catch (RuntimeException e) {
+ // add the field name to the error message
+ throw new RuntimeException(String.format("Cannot convert field %s", field.name()), e);
+ }
+ }
+
+ private static Schema resolveUnion(JsonNode datum, Collection<Schema> schemas) {
+ Set<Schema.Type> primitives = Sets.newHashSet();
+ List<Schema> others = Lists.newArrayList();
+ for (Schema schema : schemas) {
+ if (PRIMITIVES.containsKey(schema.getType())) {
+ primitives.add(schema.getType());
+ } else {
+ others.add(schema);
+ }
+ }
+
+ // Try to identify specific primitive types
+ Schema primitiveSchema = null;
+ if (datum == null || datum.isNull()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.NULL);
+ } else if (datum.isShort() || datum.isInt()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT,
+ Schema.Type.DOUBLE);
+ } else if (datum.isLong()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.LONG, Schema.Type.DOUBLE);
+ } else if (datum.isFloat()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.FLOAT, Schema.Type.DOUBLE);
+ } else if (datum.isDouble()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.DOUBLE);
+ } else if (datum.isBoolean()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.BOOLEAN);
+ }
+
+ if (primitiveSchema != null) {
+ return primitiveSchema;
+ }
+
+ // otherwise, select the first schema that matches the datum
+ for (Schema schema : others) {
+ if (matches(datum, schema)) {
+ return schema;
+ }
+ }
+
+ throw new RuntimeException(String.format("Cannot resolve union: %s not in %s", datum, schemas));
+ }
+
+ // this does not contain string, bytes, or fixed because the datum type
+ // doesn't necessarily determine the schema.
+ private static final ImmutableMap<Schema.Type, Schema> PRIMITIVES = ImmutableMap.<Schema.Type, Schema> builder()
+ .put(Schema.Type.NULL, Schema.create(Schema.Type.NULL))
+ .put(Schema.Type.BOOLEAN, Schema.create(Schema.Type.BOOLEAN))
+ .put(Schema.Type.INT, Schema.create(Schema.Type.INT)).put(Schema.Type.LONG, Schema.create(Schema.Type.LONG))
+ .put(Schema.Type.FLOAT, Schema.create(Schema.Type.FLOAT))
+ .put(Schema.Type.DOUBLE, Schema.create(Schema.Type.DOUBLE)).build();
+
+ private static Schema closestPrimitive(Set<Schema.Type> possible, Schema.Type... types) {
+ for (Schema.Type type : types) {
+ if (possible.contains(type) && PRIMITIVES.containsKey(type)) {
+ return PRIMITIVES.get(type);
+ }
+ }
+ return null;
+ }
+
+ private static boolean matches(JsonNode datum, Schema schema) {
+ switch (schema.getType()) {
+ case RECORD:
+ if (datum.isObject()) {
+ // check that each field is present or has a default
+ boolean missingField = false;
+ for (Schema.Field field : schema.getFields()) {
+ if (!datum.has(field.name()) && !field.hasDefaultValue()) {
+ missingField = true;
+ break;
+ }
+ }
+ if (!missingField) {
+ return true;
+ }
+ }
+ break;
+ case UNION:
+ if (resolveUnion(datum, schema.getTypes()) != null) {
+ return true;
+ }
+ break;
+ case MAP:
+ if (datum.isObject()) {
+ return true;
+ }
+ break;
+ case ARRAY:
+ if (datum.isArray()) {
+ return true;
+ }
+ break;
+ case BOOLEAN:
+ if (datum.isBoolean()) {
+ return true;
+ }
+ break;
+ case FLOAT:
+ if (datum.isFloat() || datum.isInt()) {
+ return true;
+ }
+ break;
+ case DOUBLE:
+ if (datum.isDouble() || datum.isFloat() || datum.isLong() || datum.isInt()) {
+ return true;
+ }
+ break;
+ case INT:
+ if (datum.isInt()) {
+ return true;
+ }
+ break;
+ case LONG:
+ if (datum.isLong() || datum.isInt()) {
+ return true;
+ }
+ break;
+ case STRING:
+ if (datum.isTextual()) {
+ return true;
+ }
+ break;
+ case ENUM:
+ if (datum.isTextual() && schema.hasEnumSymbol(datum.textValue())) {
+ return true;
+ }
+ break;
+ case BYTES:
+ case FIXED:
+ if (datum.isBinary()) {
+ return true;
+ }
+ break;
+ case NULL:
+ if (datum == null || datum.isNull()) {
+ return true;
+ }
+ break;
+ default: // UNION or unknown
+ throw new IllegalArgumentException("Unsupported schema: " + schema);
+ }
+ return false;
+ }
+
+ public static Schema inferSchema(InputStream incoming, final String name, int numRecords) {
+ Iterator<Schema> schemas = Iterators.transform(parser(incoming), node -> inferSchema(node, name));
+
+ if (!schemas.hasNext()) {
+ return null;
+ }
+
+ Schema result = schemas.next();
+ for (int i = 1; schemas.hasNext() && i < numRecords; i += 1) {
+ result = SchemaUtil.merge(result, schemas.next());
+ }
+
+ return result;
+ }
+
+ public static Schema inferSchema(JsonNode node, String name) {
+ return visit(node, new JsonSchemaVisitor(name));
+ }
+
+ private static class JsonSchemaVisitor extends JsonTreeVisitor<Schema> {
+
+ private static final Joiner DOT = Joiner.on('.');
+ private final String name;
+ private boolean objectsToRecords = true;
+
+ public JsonSchemaVisitor(String name) {
+ this.name = name;
+ }
+
+ public JsonSchemaVisitor useMaps() {
+ this.objectsToRecords = false;
+ return this;
+ }
+
+ @Override
+ public Schema object(ObjectNode object, Map<String, Schema> fields) {
+ if (objectsToRecords || recordLevels.size() < 1) {
+ List<Schema.Field> recordFields = Lists.newArrayListWithExpectedSize(fields.size());
+
+ for (Map.Entry<String, Schema> entry : fields.entrySet()) {
+ recordFields.add(new Schema.Field(entry.getKey(), entry.getValue(),
+ "Type inferred from '" + object.get(entry.getKey()) + "'", null));
+ }
+
+ Schema recordSchema;
+ if (recordLevels.size() < 1) {
+ recordSchema = Schema.createRecord(name, null, null, false);
+ } else {
+ recordSchema = Schema.createRecord(DOT.join(recordLevels), null, null, false);
+ }
+
+ recordSchema.setFields(recordFields);
+
+ return recordSchema;
+
+ } else {
+ // translate to a map; use LinkedHashSet to preserve schema order
+ switch (fields.size()) {
+ case 0:
+ return Schema.createMap(Schema.create(Schema.Type.NULL));
+ case 1:
+ return Schema.createMap(Iterables.getOnlyElement(fields.values()));
+ default:
+ return Schema.createMap(SchemaUtil.mergeOrUnion(fields.values()));
+ }
+ }
+ }
+
+ @Override
+ public Schema array(ArrayNode ignored, List<Schema> elementSchemas) {
+ // use LinkedHashSet to preserve schema order
+ switch (elementSchemas.size()) {
+ case 0:
+ return Schema.createArray(Schema.create(Schema.Type.NULL));
+ case 1:
+ return Schema.createArray(Iterables.getOnlyElement(elementSchemas));
+ default:
+ return Schema.createArray(SchemaUtil.mergeOrUnion(elementSchemas));
+ }
+ }
+
+ @Override
+ public Schema binary(BinaryNode ignored) {
+ return Schema.create(Schema.Type.BYTES);
+ }
+
+ @Override
+ public Schema text(TextNode ignored) {
+ return Schema.create(Schema.Type.STRING);
+ }
+
+ @Override
+ public Schema number(NumericNode number) {
+ if (number.isInt()) {
+ return Schema.create(Schema.Type.INT);
+ } else if (number.isLong()) {
+ return Schema.create(Schema.Type.LONG);
+ } else if (number.isFloat()) {
+ return Schema.create(Schema.Type.FLOAT);
+ } else if (number.isDouble()) {
+ return Schema.create(Schema.Type.DOUBLE);
+ } else {
+ throw new UnsupportedOperationException(number.getClass().getName() + " is not supported");
+ }
+ }
+
+ @Override
+ public Schema bool(BooleanNode ignored) {
+ return Schema.create(Schema.Type.BOOLEAN);
+ }
+
+ @Override
+ public Schema nullNode(NullNode ignored) {
+ return Schema.create(Schema.Type.NULL);
+ }
+
+ @Override
+ public Schema missing(MissingNode ignored) {
+ throw new UnsupportedOperationException("MissingNode is not supported.");
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/SchemaUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/SchemaUtil.java
new file mode 100644
index 0000000..36cd8d0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/parquet/SchemaUtil.java
@@ -0,0 +1,557 @@
+/*
+ * Copyright 2013 Cloudera Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.test.external_dataset.parquet;
+
+import static org.apache.avro.Schema.Field.NULL_DEFAULT_VALUE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.commons.codec.binary.Base64;
+import org.codehaus.jackson.node.NullNode;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/* *
+
+Copied from kite-sdk and modified to work with latest apache-avro version
+
+ * */
+public class SchemaUtil {
+
+ public abstract static class SchemaVisitor<T> {
+ protected LinkedList<String> recordLevels = Lists.newLinkedList();
+
+ public T record(Schema record, List<String> names, List<T> fields) {
+ return null;
+ }
+
+ public T union(Schema union, List<T> options) {
+ return null;
+ }
+
+ public T array(Schema array, T element) {
+ return null;
+ }
+
+ public T map(Schema map, T value) {
+ return null;
+ }
+
+ public T primitive(Schema primitive) {
+ return null;
+ }
+ }
+
+ public static <T> T visit(Schema schema, SchemaVisitor<T> visitor) {
+ switch (schema.getType()) {
+ case RECORD:
+ // check to make sure this hasn't been visited before
+ String name = schema.getFullName();
+ Preconditions.checkState(!visitor.recordLevels.contains(name),
+ "Cannot process recursive Avro record %s", name);
+
+ visitor.recordLevels.push(name);
+
+ List<Schema.Field> fields = schema.getFields();
+ List<String> names = Lists.newArrayListWithExpectedSize(fields.size());
+ List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
+ for (Schema.Field field : schema.getFields()) {
+ names.add(field.name());
+ results.add(visit(field.schema(), visitor));
+ }
+
+ visitor.recordLevels.pop();
+
+ return visitor.record(schema, names, results);
+
+ case UNION:
+ List<Schema> types = schema.getTypes();
+ List<T> options = Lists.newArrayListWithExpectedSize(types.size());
+ for (Schema type : types) {
+ options.add(visit(type, visitor));
+ }
+ return visitor.union(schema, options);
+
+ case ARRAY:
+ return visitor.array(schema, visit(schema.getElementType(), visitor));
+
+ case MAP:
+ return visitor.map(schema, visit(schema.getValueType(), visitor));
+
+ default:
+ return visitor.primitive(schema);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static String toString(Object value, Schema schema) {
+ switch (schema.getType()) {
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return value.toString();
+ case STRING:
+ // TODO: could be null
+ try {
+ return URLEncoder.encode(value.toString(), "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Failed to encode value: " + value, e);
+ }
+ default:
+ // otherwise, encode as Avro binary and then base64
+ DatumWriter writer = ReflectData.get().createDatumWriter(schema);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ try {
+ writer.write(value, encoder);
+ encoder.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot encode Avro value", e);
+ }
+ return Base64.encodeBase64URLSafeString(out.toByteArray());
+ }
+ }
+
+ /**
+ * Merges {@link Schema} instances if they are compatible.
+ * <p>
+ * Schemas are incompatible if:
+ * <ul>
+ * <li>The {@link Schema.Type} does not match.</li>
+ * <li>For record schemas, the record name does not match</li>
+ * <li>For enum schemas, the enum name does not match</li>
+ * </ul>
+ * <p>
+ * Map value, array element, and record field types will use unions if
+ * necessary, and union schemas are merged recursively.
+ *
+ * @param schemas
+ * a set of {@code Schema} instances to merge
+ * @return a merged {@code Schema}
+ * @throws RuntimeException
+ * if the schemas are not compatible
+ */
+ public static Schema merge(Iterable<Schema> schemas) {
+ Iterator<Schema> iter = schemas.iterator();
+ if (!iter.hasNext()) {
+ return null;
+ }
+ Schema result = iter.next();
+ while (iter.hasNext()) {
+ result = merge(result, iter.next());
+ }
+ return result;
+ }
+
+ /**
+ * Merges {@link Schema} instances and creates a union of schemas if any are incompatible.
+ * <p>
+ * Schemas are incompatible if:
+ * <ul>
+ * <li>The {@link Schema.Type} does not match.</li>
+ * <li>For record schemas, the record name does not match</li>
+ * <li>For enum schemas, the enum name does not match</li>
+ * </ul>
+ * <p>
+ * Map value, array element, and record field types types will use unions if
+ * necessary, and union schemas are merged recursively.
+ *
+ * @param schemas
+ * a set of {@code Schema} instances to merge
+ * @return a combined {@code Schema}
+ */
+ public static Schema mergeOrUnion(Iterable<Schema> schemas) {
+ Iterator<Schema> iter = schemas.iterator();
+ if (!iter.hasNext()) {
+ return null;
+ }
+ Schema result = iter.next();
+ while (iter.hasNext()) {
+ result = mergeOrUnion(result, iter.next());
+ }
+ return result;
+ }
+
+ /**
+ * Precondition-style validation that throws a {@link RuntimeException}.
+ *
+ * @param isValid
+ * {@code true} if valid, {@code false} if an exception should be
+ * thrown
+ * @param message
+ * A String message for the exception.
+ */
+ public static void check(boolean isValid, String message, Object... args) {
+ if (!isValid) {
+ String[] argStrings = new String[args.length];
+ for (int i = 0; i < args.length; i += 1) {
+ argStrings[i] = String.valueOf(args[i]);
+ }
+ throw new RuntimeException(String.format(String.valueOf(message), (Object[]) argStrings));
+ }
+ }
+
+ /**
+ * Merges two {@link Schema} instances if they are compatible.
+ * <p>
+ * Two schemas are incompatible if:
+ * <ul>
+ * <li>The {@link Schema.Type} does not match.</li>
+ * <li>For record schemas, the record name does not match</li>
+ * <li>For enum schemas, the enum name does not match</li>
+ * </ul>
+ * <p>
+ * Map value and array element types will use unions if necessary, and union
+ * schemas are merged recursively.
+ *
+ * @param left
+ * a {@code Schema}
+ * @param right
+ * a {@code Schema}
+ * @return a merged {@code Schema}
+ * @throws RuntimeException
+ * if the schemas are not compatible
+ */
+ public static Schema merge(Schema left, Schema right) {
+ Schema merged = mergeOnly(left, right);
+ check(merged != null, "Cannot merge %s and %s", left, right);
+ return merged;
+ }
+
+ /**
+ * Merges two {@link Schema} instances or returns {@code null}.
+ * <p>
+ * The two schemas are merged if they are the same type. Records are merged if the two records have the same name or
+ * have no names but have a significant number of shared fields.
+ * <p>
+ *
+ * @param left
+ * a {@code Schema}
+ * @param right
+ * a {@code Schema}
+ * @return a {@code Schema} for both types
+ * @see {@link #mergeOrUnion} to return a union when a merge is not possible.
+ */
+ private static Schema mergeOrUnion(Schema left, Schema right) {
+ Schema merged = mergeOnly(left, right);
+ if (merged != null) {
+ return merged;
+ }
+ return union(left, right);
+ }
+
+ /**
+ * Creates a union of two {@link Schema} instances.
+ * <p>
+ * If either {@code Schema} is a union, this will attempt to merge the other schema with the types contained in that
+ * union before adding more types to the union that is produced.
+ * <p>
+ * If both schemas are not unions, no merge is attempted.
+ *
+ * @param left
+ * a {@code Schema}
+ * @param right
+ * a {@code Schema}
+ * @return a UNION schema of the to {@code Schema} instances
+ */
+ private static Schema union(Schema left, Schema right) {
+ if (left.getType() == Schema.Type.UNION) {
+ if (right.getType() == Schema.Type.UNION) {
+ // combine the unions by adding each type in right individually
+ Schema combined = left;
+ for (Schema type : right.getTypes()) {
+ combined = union(combined, type);
+ }
+ return combined;
+
+ } else {
+ boolean notMerged = true;
+ // combine a union with a non-union by checking if each type will merge
+ List<Schema> types = Lists.newArrayList();
+ Iterator<Schema> schemas = left.getTypes().iterator();
+ // try to merge each type and stop when one succeeds
+ while (schemas.hasNext()) {
+ Schema next = schemas.next();
+ Schema merged = mergeOnly(next, right);
+ if (merged != null) {
+ types.add(merged);
+ notMerged = false;
+ break;
+ } else {
+ // merge didn't work, add the type
+ types.add(next);
+ }
+ }
+ // add the remaining types from the left union
+ while (schemas.hasNext()) {
+ types.add(schemas.next());
+ }
+
+ if (notMerged) {
+ types.add(right);
+ }
+
+ return Schema.createUnion(types);
+ }
+ } else if (right.getType() == Schema.Type.UNION) {
+ return union(right, left);
+ }
+
+ return Schema.createUnion(ImmutableList.of(left, right));
+ }
+
+ /**
+ * Merges two {@link Schema} instances or returns {@code null}.
+ * <p>
+ * The two schemas are merged if they are the same type. Records are merged if the two records have the same name or
+ * have no names but have a significant number of shared fields.
+ * <p>
+ *
+ * @param left
+ * a {@code Schema}
+ * @param right
+ * a {@code Schema}
+ * @return a merged {@code Schema} or {@code null} if merging is not possible
+ * @see {@link #mergeOrUnion} to return a union when a merge is not possible.
+ */
+ private static Schema mergeOnly(Schema left, Schema right) {
+ if (Objects.equal(left, right)) {
+ return left;
+ }
+
+ // handle primitive type promotion; doesn't promote integers to floats
+ switch (left.getType()) {
+ case INT:
+ if (right.getType() == Schema.Type.LONG) {
+ return right;
+ }
+ break;
+ case LONG:
+ if (right.getType() == Schema.Type.INT) {
+ return left;
+ }
+ break;
+ case FLOAT:
+ if (right.getType() == Schema.Type.DOUBLE) {
+ return right;
+ }
+ break;
+ case DOUBLE:
+ if (right.getType() == Schema.Type.FLOAT) {
+ return left;
+ }
+ }
+
+ // any other cases where the types don't match must be combined by a union
+ if (left.getType() != right.getType()) {
+ return null;
+ }
+
+ switch (left.getType()) {
+ case UNION:
+ return union(left, right);
+ case RECORD:
+ if (left.getName() == null && right.getName() == null
+ && fieldSimilarity(left, right) < SIMILARITY_THRESH) {
+ return null;
+ } else if (!Objects.equal(left.getName(), right.getName())) {
+ return null;
+ }
+
+ Schema combinedRecord = Schema.createRecord(coalesce(left.getName(), right.getName()),
+ coalesce(left.getDoc(), right.getDoc()), coalesce(left.getNamespace(), right.getNamespace()),
+ false);
+ combinedRecord.setFields(mergeFields(left, right));
+
+ return combinedRecord;
+
+ case MAP:
+ return Schema.createMap(mergeOrUnion(left.getValueType(), right.getValueType()));
+
+ case ARRAY:
+ return Schema.createArray(mergeOrUnion(left.getElementType(), right.getElementType()));
+
+ case ENUM:
+ if (!Objects.equal(left.getName(), right.getName())) {
+ return null;
+ }
+ Set<String> symbols = Sets.newLinkedHashSet();
+ symbols.addAll(left.getEnumSymbols());
+ symbols.addAll(right.getEnumSymbols());
+ return Schema.createEnum(left.getName(), coalesce(left.getDoc(), right.getDoc()),
+ coalesce(left.getNamespace(), right.getNamespace()), ImmutableList.copyOf(symbols));
+
+ default:
+ // all primitives are handled before the switch by the equality check.
+ // schemas that reach this point are not primitives and also not any of
+ // the above known types.
+ throw new UnsupportedOperationException("Unknown schema type: " + left.getType());
+ }
+ }
+
+ private static final Schema NULL = Schema.create(Schema.Type.NULL);
+ private static final NullNode NULL_DEFAULT = NullNode.getInstance();
+
+ /**
+ * Returns a union {@link Schema} of NULL and the given {@code schema}.
+ * <p>
+ * A NULL schema is always the first type in the union so that a null default value can be set.
+ *
+ * @param schema
+ * a {@code Schema}
+ * @return a union of null and the given schema
+ */
+ private static Schema nullableForDefault(Schema schema) {
+ if (schema.getType() == Schema.Type.NULL) {
+ return schema;
+ }
+
+ if (schema.getType() != Schema.Type.UNION) {
+ return Schema.createUnion(ImmutableList.of(NULL, schema));
+ }
+
+ if (schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+ return schema;
+ }
+
+ List<Schema> types = Lists.newArrayList();
+ types.add(NULL);
+ for (Schema type : schema.getTypes()) {
+ if (type.getType() != Schema.Type.NULL) {
+ types.add(type);
+ }
+ }
+
+ return Schema.createUnion(types);
+ }
+
+ private static List<Schema.Field> mergeFields(Schema left, Schema right) {
+ List<Schema.Field> fields = Lists.newArrayList();
+ for (Schema.Field leftField : left.getFields()) {
+ Schema.Field rightField = right.getField(leftField.name());
+ if (rightField != null) {
+ fields.add(new Schema.Field(leftField.name(), mergeOrUnion(leftField.schema(), rightField.schema()),
+ coalesce(leftField.doc(), rightField.doc()),
+ coalesce(leftField.defaultVal(), rightField.defaultVal())));
+ } else {
+ if (leftField.hasDefaultValue()) {
+ fields.add(copy(leftField));
+ } else {
+ fields.add(new Schema.Field(leftField.name(), nullableForDefault(leftField.schema()),
+ leftField.doc(), NULL_DEFAULT_VALUE));
+ }
+ }
+ }
+
+ for (Schema.Field rightField : right.getFields()) {
+ if (left.getField(rightField.name()) == null) {
+ if (rightField.hasDefaultValue()) {
+ fields.add(copy(rightField));
+ } else {
+ fields.add(new Schema.Field(rightField.name(), nullableForDefault(rightField.schema()),
+ rightField.doc(), NULL_DEFAULT_VALUE));
+ }
+ }
+ }
+
+ return fields;
+ }
+
+ /**
+ * Creates a new field with the same name, schema, doc, and default value as the incoming schema.
+ * <p>
+ * Fields cannot be used in more than one record (not Immutable?).
+ */
+ public static Schema.Field copy(Schema.Field field) {
+ return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
+ }
+
+ private static float fieldSimilarity(Schema left, Schema right) {
+ // check whether the unnamed records appear to be the same record
+ Set<String> leftNames = names(left.getFields());
+ Set<String> rightNames = names(right.getFields());
+ int common = Sets.intersection(leftNames, rightNames).size();
+ float leftRatio = ((float) common) / ((float) leftNames.size());
+ float rightRatio = ((float) common) / ((float) rightNames.size());
+ return hmean(leftRatio, rightRatio);
+ }
+
+ private static Set<String> names(Collection<Schema.Field> fields) {
+ Set<String> names = Sets.newHashSet();
+ for (Schema.Field field : fields) {
+ names.add(field.name());
+ }
+ return names;
+ }
+
+ private static float SIMILARITY_THRESH = 0.3f;
+
+ private static float hmean(float left, float right) {
+ return (2.0f * left * right) / (left + right);
+ }
+
+ /**
+ * Returns the first non-null object that is passed in.
+ */
+ private static <E> E coalesce(E... objects) {
+ for (E object : objects) {
+ if (object != null) {
+ return object;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns whether null is allowed by the schema.
+ *
+ * @param schema
+ * a Schema
+ * @return true if schema allows the value to be null
+ */
+ public static boolean nullOk(Schema schema) {
+ if (Schema.Type.NULL == schema.getType()) {
+ return true;
+ } else if (Schema.Type.UNION == schema.getType()) {
+ for (Schema possible : schema.getTypes()) {
+ if (nullOk(possible)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
index 6ea87ef..fe3006e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
@@ -18,10 +18,36 @@
*/
package org.apache.asterix.test.runtime;
-import java.util.Collection;
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.Tables;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,9 +62,56 @@
public class SqlppHdfsExecutionTest {
protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+ private static DataFile writeFile(String filename, List<Record> records, String location, Schema schema,
+ Configuration conf) throws IOException {
+ Path path = new Path(location, filename);
+ FileFormat fileFormat = FileFormat.fromFileName(filename);
+ Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
+
+ FileAppender<Record> fileAppender =
+ new GenericAppenderFactory(schema).newAppender(fromPath(path, conf), fileFormat);
+ try (FileAppender<Record> appender = fileAppender) {
+ appender.addAll(records);
+ }
+
+ return DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path, conf))
+ .withMetrics(fileAppender.metrics()).build();
+ }
+
+ private static void setUpIcebergData() {
+ Configuration conf = new Configuration();
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, "hdfs://127.0.0.1:31888/");
+
+ Tables tables = new HadoopTables(conf);
+
+ Schema schema =
+ new Schema(required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get()));
+
+ String path = "hdfs://localhost:31888/my_table/";
+
+ Table table = tables.create(schema, PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), path);
+
+ Record genericRecord = GenericRecord.create(schema);
+ List<Record> fileFirstSnapshotRecords =
+ ImmutableList.of(genericRecord.copy(ImmutableMap.of("id", 0, "data", "vibrant_mclean")),
+ genericRecord.copy(ImmutableMap.of("id", 1, "data", "frosty_wilson")),
+ genericRecord.copy(ImmutableMap.of("id", 2, "data", "serene_kirby")));
+
+ // load test data
+ try {
+ DataFile file =
+ writeFile(FileFormat.PARQUET.addExtension("file"), fileFirstSnapshotRecords, path, schema, conf);
+ table.newAppend().appendFile(file).commit();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@BeforeClass
public static void setUp() throws Exception {
LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor(), true);
+ setUpIcebergData();
}
@AfterClass
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-empty/iceberg-empty.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-empty/iceberg-empty.00.ddl.sqlpp
new file mode 100644
index 0000000..127a2a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-empty/iceberg-empty.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table-empty"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-empty/iceberg-empty.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-empty/iceberg-empty.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-empty/iceberg-empty.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-invalid-location/iceberg-metadata-invalid-location.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-invalid-location/iceberg-metadata-invalid-location.00.ddl.sqlpp
new file mode 100644
index 0000000..5123c78
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-invalid-location/iceberg-metadata-invalid-location.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table-invalid-path"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-invalid-location/iceberg-metadata-invalid-location.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-invalid-location/iceberg-metadata-invalid-location.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-invalid-location/iceberg-metadata-invalid-location.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-specific-location/iceberg-load-selective-metadata.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-specific-location/iceberg-load-selective-metadata.00.ddl.sqlpp
new file mode 100644
index 0000000..ee301bb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-specific-location/iceberg-load-selective-metadata.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table#DATA_FILES"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-specific-location/iceberg-load-selective-metadata.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-specific-location/iceberg-load-selective-metadata.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-metadata-specific-location/iceberg-load-selective-metadata.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-mixed-data-format/iceberg-mixed-data-format.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-mixed-data-format/iceberg-mixed-data-format.00.ddl.sqlpp
new file mode 100644
index 0000000..f06c7ed
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-mixed-data-format/iceberg-mixed-data-format.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table-mixed-data-format"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-mixed-data-format/iceberg-mixed-data-format.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-mixed-data-format/iceberg-mixed-data-format.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-mixed-data-format/iceberg-mixed-data-format.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.00.ddl.sqlpp
new file mode 100644
index 0000000..09f2849
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table-modified-data"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.00.ddl.sqlpp
new file mode 100644
index 0000000..4e1b811
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table-multiple-data-files"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.01.query.sqlpp
new file mode 100644
index 0000000..6597859
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds ORDER BY ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-unsupported-version/iceberg-unsupported-version.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-unsupported-version/iceberg-unsupported-version.00.ddl.sqlpp
new file mode 100644
index 0000000..d37316f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-unsupported-version/iceberg-unsupported-version.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table-format-version-2"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-unsupported-version/iceberg-unsupported-version.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-unsupported-version/iceberg-unsupported-version.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg-unsupported-version/iceberg-unsupported-version.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.00.ddl.sqlpp
new file mode 100644
index 0000000..526e75e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://localhost:8001"),
+ ("container"="iceberg-container"),
+ ("definition"="my-table"),
+ ("table-format"="apache-iceberg"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.01.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/iceberg/read-data.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/iceberg/read-data.1.ddl.sqlpp
new file mode 100644
index 0000000..f593e83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/iceberg/read-data.1.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE IcebergTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET IcebergDataset(IcebergTableType) USING hdfs
+ (
+ ("hdfs"="hdfs://127.0.0.1:31888"),
+ ("input-format"="parquet-input-format"),
+ ("table-format"="apache-iceberg"),
+ ("metadata-path"="my_table/")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/iceberg/read-data.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/iceberg/read-data.2.query.sqlpp
new file mode 100644
index 0000000..957509d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/iceberg/read-data.2.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM IcebergDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-empty/iceberg-empty.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-empty/iceberg-empty.01.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-empty/iceberg-empty.01.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.01.adm
new file mode 100644
index 0000000..ab4a5a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-modified-data/iceberg-modified-data.01.adm
@@ -0,0 +1,3 @@
+{ "id": 3, "data": "peaceful_pare" }
+{ "id": 4, "data": "laughing_mahavira" }
+{ "id": 5, "data": "vibrant_lamport" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.01.adm
new file mode 100644
index 0000000..1a14033
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg-multiple-data-files/iceberg-multiple-data-files.01.adm
@@ -0,0 +1,6 @@
+{ "id": 0, "data": "vibrant_mclean" }
+{ "id": 1, "data": "frosty_wilson" }
+{ "id": 2, "data": "serene_kirby" }
+{ "id": 3, "data": "peaceful_pare" }
+{ "id": 4, "data": "laughing_mahavira" }
+{ "id": 5, "data": "vibrant_lamport" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.01.adm
new file mode 100644
index 0000000..1ce01ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/iceberg/iceberg-read-from-latest-snapshot.01.adm
@@ -0,0 +1,3 @@
+{ "id": 0, "data": "vibrant_mclean" }
+{ "id": 1, "data": "frosty_wilson" }
+{ "id": 2, "data": "serene_kirby" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/hdfs/iceberg/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/hdfs/iceberg/read-data.2.adm
new file mode 100644
index 0000000..1ce01ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/hdfs/iceberg/read-data.2.adm
@@ -0,0 +1,3 @@
+{ "id": 0, "data": "vibrant_mclean" }
+{ "id": 1, "data": "frosty_wilson" }
+{ "id": 2, "data": "serene_kirby" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 8db1798..5950211 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -478,6 +478,51 @@
<output-dir compare="Text">common/byte_order_mark/tsv</output-dir>
</compilation-unit>
</test-case>
+ <!-- Iceberg Tests Start -->
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg">
+ <output-dir compare="Text">iceberg</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-unsupported-version">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>ASX1179: Unsupported iceberg format version</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-metadata-invalid-location">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>Unable to create adapter</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-metadata-specific-location">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>ASX1178: Unsupported iceberg table</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-mixed-data-format">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>avro-file.avro. Reason: not a Parquet file</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-empty">
+ <output-dir compare="Text">iceberg-empty</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-multiple-data-files">
+ <output-dir compare="Text">iceberg-multiple-data-files</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/s3">
+ <compilation-unit name="iceberg-modified-data">
+ <output-dir compare="Text">iceberg-modified-data</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="copy">
<test-case FilePath="copy">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
index 0d3ae3d..9dd6b99 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
@@ -58,5 +58,10 @@
<output-dir compare="Text">parquet</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="hdfs">
+ <compilation-unit name="iceberg">
+ <output-dir compare="Text">iceberg</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index b0826e8..2135c67 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -270,6 +270,9 @@
INVALID_SAMPLE_SIZE(1175),
OUT_OF_RANGE_SAMPLE_SIZE(1176),
INVALID_SAMPLE_SEED(1177),
+ UNSUPPORTED_ICEBERG_TABLE(1178),
+ UNSUPPORTED_ICEBERG_FORMAT_VERSION(1179),
+ ERROR_READING_ICEBERG_METADATA(1180),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 0bf523a..3f54340 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -272,6 +272,9 @@
1175 = Sample size options are "low", "medium", "high", or a number
1176 = Sample size has to be between %1$s and %2$s
1177 = Sample seed has to be a number or a string convertible to a number
+1178 = Unsupported iceberg table
+1179 = Unsupported iceberg format version
+1180 = Error reading iceberg data
# Feed Errors
3001 = Illegal state.
3002 = Tuple is too large for a frame
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 4c2374a..466bbb1 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -550,6 +550,16 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util-ajax</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.1</version>
+ </dependency>
</dependencies>
<!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
<repositories>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 92b7a95..93f1e69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -55,7 +55,8 @@
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
//Get path
- String path = buildPathURIs(configuration, warningCollector);
+ String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
+ ? configuration.get(ExternalDataConstants.KEY_PATH) : buildPathURIs(configuration, warningCollector);
//Put S3 configurations to AsterixDB's Hadoop configuration
putS3ConfToHadoopConf(configuration, path);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 479679e..5a5994e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -116,6 +116,9 @@
public static final String FORMAT_RECORD_WITH_METADATA = "record-with-metadata";
// a string representing the format of the record (for adapters which produces records with additional information like pk or metadata)
public static final String KEY_RECORD_FORMAT = "record-format";
+ public static final String TABLE_FORMAT = "table-format";
+ public static final String ICEBERG_METADATA_LOCATION = "metadata-path";
+ public static final int SUPPORTED_ICEBERG_FORMAT_VERSION = 1;
public static final String KEY_META_TYPE_NAME = "meta-type-name";
public static final String KEY_ADAPTER_NAME = "adapter-name";
public static final String READER_STREAM = "stream";
@@ -196,6 +199,7 @@
public static final String FORMAT_CSV = "csv";
public static final String FORMAT_TSV = "tsv";
public static final String FORMAT_PARQUET = "parquet";
+ public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
public static final Set<String> ALL_FORMATS;
static {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 1605360..cb47c5d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -64,6 +64,7 @@
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -74,6 +75,8 @@
import org.apache.asterix.runtime.evaluators.common.NumberUtils;
import org.apache.asterix.runtime.projection.DataProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -89,6 +92,11 @@
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import org.apache.hyracks.util.StorageUtil;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
@@ -414,7 +422,7 @@
* @param adapterName adapter name
* @param configuration external data configuration
*/
- public static void prepare(String adapterName, Map<String, String> configuration) {
+ public static void prepare(String adapterName, Map<String, String> configuration) throws AlgebricksException {
if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
configuration.put(ExternalDataConstants.KEY_READER, adapterName);
}
@@ -428,6 +436,81 @@
&& configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT));
}
+
+ if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
+ prepareTableFormat(configuration);
+ }
+ }
+
+ /**
+ * Prepares the configuration for data-lake table formats
+ *
+ * @param configuration
+ * external data configuration
+ */
+ public static void prepareTableFormat(Map<String, String> configuration) throws AlgebricksException {
+ // Apache Iceberg table format
+ if (configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG)) {
+ Configuration conf = new Configuration();
+
+ String metadata_path = configuration.get(ExternalDataConstants.ICEBERG_METADATA_LOCATION);
+
+ // If the table is in S3
+ if (configuration.get(ExternalDataConstants.KEY_READER)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+
+ conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+ conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
+ configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ metadata_path = S3Constants.HADOOP_S3_PROTOCOL + "://"
+ + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ } else if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS)) {
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+ configuration.get(ExternalDataConstants.KEY_HDFS_URL));
+ metadata_path = configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + metadata_path;
+ }
+
+ HadoopTables tables = new HadoopTables(conf);
+
+ Table icebergTable = tables.load(metadata_path);
+
+ if (icebergTable instanceof BaseTable) {
+ BaseTable baseTable = (BaseTable) icebergTable;
+
+ if (baseTable.operations().current()
+ .formatVersion() != ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) {
+ throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION,
+ "AsterixDB only supports Iceberg version up to "
+ + ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION);
+ }
+
+ try (CloseableIterable<FileScanTask> fileScanTasks = baseTable.newScan().planFiles()) {
+
+ StringBuilder builder = new StringBuilder();
+
+ for (FileScanTask task : fileScanTasks) {
+ builder.append(",");
+ String path = task.file().path().toString();
+ builder.append(path);
+ }
+
+ if (builder.length() > 0) {
+ builder.deleteCharAt(0);
+ }
+
+ configuration.put(ExternalDataConstants.KEY_PATH, builder.toString());
+
+ } catch (IOException e) {
+ throw new AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e);
+ }
+
+ } else {
+ throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE,
+ "Invalid iceberg base table. Please remove metadata specifiers");
+ }
+
+ }
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 79bbbe2..a62b346 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -44,6 +44,7 @@
public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
public static final String HADOOP_REGION = "fs.s3a.region";
public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
+ public static final String HADOOP_S3_FILESYSTEM_IMPLEMENTATION = "fs.s3a.impl";
/*
* Internal configurations
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 54526df..e223d54 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -562,6 +562,45 @@
</gavs>
<url>https://raw.githubusercontent.com/bcgit/bc-java/r1rv60/LICENSE.html</url>
</override>
+ <override>
+ <gavs>
+ <gav>org.jetbrains:annotations:17.0.0</gav>
+ </gavs>
+ <url>https://raw.githubusercontent.com/JetBrains/java-annotations/master/LICENSE.txt</url>
+ </override>
+ <override>
+ <gavs>
+ <gav>org.roaringbitmap:RoaringBitmap:0.9.22</gav>
+ <gav>org.roaringbitmap:shims:0.9.22</gav>
+ </gavs>
+ <url>https://raw.githubusercontent.com/RoaringBitmap/RoaringBitmap/0.9.39/LICENSE</url>
+ </override>
+ <override>
+ <gavs>
+ <gav>io.airlift:aircompressor:0.21</gav>
+ </gavs>
+ <url>https://raw.githubusercontent.com/airlift/aircompressor/0.21/license.txt</url>
+ <noticeUrl>https://raw.githubusercontent.com/airlift/aircompressor/0.21/notice.md</noticeUrl>
+ </override>
+ <override>
+ <gavs>
+ <gav>org.apache.orc:orc-core:1.8.0</gav>
+ </gavs>
+ <url>https://raw.githubusercontent.com/apache/orc/v1.8.0/LICENSE</url>
+ <noticeUrl>https://raw.githubusercontent.com/apache/orc/v1.8.0/NOTICE</noticeUrl>
+ </override>
+ <override>
+ <gavs>
+ <gav>tech.allegro.schema.json2avro:converter:0.2.15</gav>
+ </gavs>
+ <url>https://raw.githubusercontent.com/allegro/json-avro-converter/json-avro-converter-0.2.15/LICENSE.md</url>
+ </override>
+ <override>
+ <gavs>
+ <gav>com.github.stephenc.findbugs:findbugs-annotations:1.3.9-1</gav>
+ </gavs>
+ <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </override>
</overrides>
<licenses>
<license>
@@ -635,6 +674,11 @@
<aliasUrl>https://raw.githubusercontent.com/grpc/grpc-java/v1.52.1/LICENSE</aliasUrl>
<aliasUrl>https://raw.githubusercontent.com/googleapis/java-core/v2.8.0/LICENSE</aliasUrl>
<aliasUrl>https://raw.githubusercontent.com/google/gson/gson-parent-2.9.0/LICENSE</aliasUrl>
+ <aliasUrl>https://raw.githubusercontent.com/allegro/json-avro-converter/json-avro-converter-0.2.15/LICENSE.md</aliasUrl>
+ <aliasUrl>https://raw.githubusercontent.com/airlift/aircompressor/0.21/license.txt</aliasUrl>
+ <aliasUrl>https://raw.githubusercontent.com/apache/orc/v1.8.0/LICENSE</aliasUrl>
+ <aliasUrl>https://raw.githubusercontent.com/RoaringBitmap/RoaringBitmap/0.9.39/LICENSE</aliasUrl>
+ <aliasUrl>https://raw.githubusercontent.com/JetBrains/java-annotations/master/LICENSE.txt</aliasUrl>
</aliasUrls>
<metric>1</metric>
</license>
@@ -1276,11 +1320,6 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index a6d6342..0d6c70b 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1022,6 +1022,10 @@
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
@@ -1912,11 +1916,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.kitesdk</groupId>
- <artifactId>kite-data-core</artifactId>
- <version>1.1.0</version>
- </dependency>
<!-- Hadoop AWS start -->
<dependency>
<!-- Pick a newer AWS SDK -->
@@ -2009,6 +2008,11 @@
<artifactId>jetty-util-ajax</artifactId>
<version>9.4.48.v20220622</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.1</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index ae9f9fd..c0555ec 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -2472,4 +2472,115 @@
</properties>
</project>
</supplement>
+ <!-- RoaringBitmap is ASLv2 and has no NOTICE or embedded LICENSE-->
+ <supplement>
+ <project>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>0.9.22</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>0.9.22</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>0.9.22</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+ <supplement>
+ <project>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>shims</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>0.9.22</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>0.9.22</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>0.9.22</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- aircompressor is ASLv2 and has no NOTICE or embedded LICENSE-->
+ <supplement>
+ <project>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>0.21</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>0.21</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>0.21</license.ignoreLicenseOverride>
+ <license.ignoreNoticeOverride>0.21</license.ignoreNoticeOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- json2avro converter is ASLv2 and has no NOTICE or embedded LICENSE-->
+ <supplement>
+ <project>
+ <groupId>tech.allegro.schema.json2avro</groupId>
+ <artifactId>converter</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>0.2.15</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>0.2.15</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>0.2.15</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- this reimplementation of findbugs-annotations is ASLv2 and has no NOTICE or embedded LICENSE-->
+ <supplement>
+ <project>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>1.3.9-1</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>1.3.9-1</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>1.3.9-1</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- caffeine is ASLv2 and has no NOTICE-->
+ <supplement>
+ <project>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedNotice>2.9.3</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- threeten is BSD 3-clause so it has no NOTICE -->
+ <supplement>
+ <project>
+ <groupId>org.threeten</groupId>
+ <artifactId>threeten-extra</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedNotice>1.7.1</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
+ <!-- jetbrains annotations are ASLv2 and has no NOTICE or embedded LICENSE-->
+ <supplement>
+ <project>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>17.0.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>17.0.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>17.0.0</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+ <!-- this reimplementation of findbugs-annotations is ASLv2 and no embedded LICENSE or NOTICE-->
+ <supplement>
+ <project>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>1.8.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>1.8.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>1.8.0</license.ignoreLicenseOverride>
+ <license.ignoreNoticeOverride>1.8.0</license.ignoreNoticeOverride>
+ </properties>
+ </project>
+ </supplement>
+
</supplementalDataModels>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_airlift_aircompressor_0.21_notice.md.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_airlift_aircompressor_0.21_notice.md.txt
new file mode 100644
index 0000000..20daf0c
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_airlift_aircompressor_0.21_notice.md.txt
@@ -0,0 +1,37 @@
+Snappy Copyright Notices
+=========================
+
+* Copyright 2011 Dain Sundstrom <dain@iq80.com>
+* Copyright 2011, Google Inc.<opensource@google.com>
+
+
+Snappy License
+===============
+Copyright 2011, Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_apache_orc_v1.8.0_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_apache_orc_v1.8.0_NOTICE.txt
new file mode 100644
index 0000000..22b6085
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_apache_orc_v1.8.0_NOTICE.txt
@@ -0,0 +1,8 @@
+Apache ORC
+Copyright 2013 and onwards The Apache Software Foundation.
+
+This product includes software developed by The Apache Software
+Foundation (http://www.apache.org/).
+
+This product includes software developed by Hewlett-Packard:
+(c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index e5c1d06..6dd822b 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -104,6 +104,10 @@
<artifactId>nimbus-jose-jwt</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
@@ -126,6 +130,10 @@
<artifactId>hadoop-mapreduce-client-core</artifactId>
<exclusions>
<exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>