[ASTERIXDB-3365][EXT] Revisiting Avro logical types
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details: Adding optional parameters to parse avro logical types.
- DECIMAL_TO_DOUBLE: Converts decimal values to double. (Default: false)
- UUID_AS_STRING: Converts UUID values to string representation. (Default: true)
- DATE_AS_INT: Converts date values to integers. (Default: true)
- TIME_AS_LONG: Converts time values to long. (Default: true)
- TIMESTAMP_AS_LONG: Convert timestamp to long (default: true)
Ext-ref: MB-64841,MB-64840
Change-Id: I6ce5d7f7c3deac986abd416583133475d1f86f27
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19306
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c94fd60..c0b5e26 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -421,6 +421,7 @@
loadData(generatedDataBasePath, "", "avro_type.avro", definition, definitionSegment, false, false);
loadData(generatedDataBasePath, "", "partition_heterogeneous.avro", definition, definitionSegment, false,
false);
+ loadData(generatedDataBasePath, "", "avro_logical_type.avro", definition, definitionSegment, false, false);
Collection<File> files =
IoUtil.getMatchingFiles(Paths.get(generatedDataBasePath + "/external-filter"), AVRO_FILTER);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
index db12e7b..4c13052 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
@@ -107,5 +107,6 @@
writeAvroFile(jsonFile, outputPath);
}
AvroFileExampleGeneratorUtil.writeExample();
+ AvroLogicalTypesExampleGenerator.writeLogicalTypesExample();
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
new file mode 100644
index 0000000..fd54627
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.external_dataset.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;
+
+public class AvroLogicalTypesExampleGenerator {
+ private static final String SCHEMA_STRING = "{\n" + " \"type\": \"record\",\n"
+ + " \"name\": \"LogicalTypesRecord\",\n" + " \"namespace\": \"com.example\",\n" + " \"fields\": [\n"
+ + " { \"name\": \"decimalField\", \"type\": { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 2 } },\n"
+ + " { \"name\": \"uuidField\", \"type\": { \"type\": \"string\", \"logicalType\": \"uuid\" } },\n"
+ + " { \"name\": \"dateField\", \"type\": { \"type\": \"int\", \"logicalType\": \"date\" } },\n"
+ + " { \"name\": \"timeMillisField\", \"type\": { \"type\": \"int\", \"logicalType\": \"time-millis\" } },\n"
+ + " { \"name\": \"timeMicrosField\", \"type\": { \"type\": \"long\", \"logicalType\": \"time-micros\" } },\n"
+ + " { \"name\": \"timestampMillisField\", \"type\": { \"type\": \"long\", \"logicalType\": \"timestamp-millis\" } },\n"
+ + " { \"name\": \"timestampMicrosField\", \"type\": { \"type\": \"long\", \"logicalType\": \"timestamp-micros\" } },\n"
+ + " { \"name\": \"localTimestampMillisField\", \"type\": { \"type\": \"long\", \"logicalType\": \"local-timestamp-millis\" } },\n"
+ + " { \"name\": \"localTimestampMicrosField\", \"type\": { \"type\": \"long\", \"logicalType\": \"local-timestamp-micros\" } }\n"
+ + " ]\n" + "}";
+
+ private static final String AVRO_GEN_BASEDIR = "target/generated_avro_files";
+ private static final String FILE_NAME = "avro_logical_type.avro";
+
+ public static void writeLogicalTypesExample() throws IOException {
+ Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
+ File destPath = new File(AVRO_GEN_BASEDIR);
+ if (!destPath.exists()) {
+ destPath.mkdirs();
+ }
+ File outputFile = new File(destPath, FILE_NAME);
+
+ DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<>(schema);
+ try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+ dataFileWriter.create(schema, outputFile);
+
+ // First record
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("decimalField", ByteBuffer.wrap(new byte[] { 0x01, 0x02 }));
+ record.put("uuidField", "123e4567-e89b-12d3-a456-426614174000");
+ record.put("dateField", 20061);
+ record.put("timeMillisField", 12345678);
+ record.put("timeMicrosField", 12345678901234L);
+ record.put("timestampMillisField", 1733344079083L);
+ record.put("timestampMicrosField", 1733344079083000L);
+ record.put("localTimestampMillisField", 1733344079083L);
+ record.put("localTimestampMicrosField", 1733344079083000L);
+
+ dataFileWriter.append(record);
+ } catch (IOException e) {
+ System.err.println("Failed to write AVRO file: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void main() throws IOException {
+ writeLogicalTypesExample();
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
new file mode 100644
index 0000000..73d8056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE AvroType as {
+};
+
+CREATE EXTERNAL DATASET AvroDataset(AvroType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="avro-data/reviews"),
+ ("include"="*avro_logical_type.avro"),
+ ("decimal-to-double"="true"),
+ ("uuid-to-string"="true"),
+ ("date-to-int"="true"),
+ ("time-to-long"="true"),
+ ("timestamp-to-long"="true"),
+ ("format" = "avro")
+);
+
+CREATE EXTERNAL DATASET AvroDataset2(AvroType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="avro-data/reviews"),
+ ("include"="*avro_logical_type.avro"),
+ ("decimal-to-double"="true"),
+ ("uuid-to-string"="false"),
+ ("date-to-int"="false"),
+ ("time-to-long"="false"),
+ ("timestamp-to-long"="false"),
+ ("format" = "avro")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
new file mode 100644
index 0000000..8a72adb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE a
+FROM AvroDataset a;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
new file mode 100644
index 0000000..b927aba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE a
+FROM AvroDataset2 a;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
new file mode 100644
index 0000000..7690b33
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
@@ -0,0 +1 @@
+{ "decimalField": 2.58, "uuidField": "123e4567-e89b-12d3-a456-426614174000", "dateField": 20061, "timeMillisField": 12345678, "timeMicrosField": -539222987, "timestampMillisField": 1733344079083, "timestampMicrosField": 1733344079083, "localTimestampMillisField": 1733344079083, "localTimestampMicrosField": 1733344079083 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
new file mode 100644
index 0000000..51eedae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
@@ -0,0 +1 @@
+{ "decimalField": 2.58, "uuidField": uuid("123e4567-e89b-12d3-a456-426614174000"), "dateField": date("2024-12-04"), "timeMillisField": time("03:25:45.678"), "timeMicrosField": time("18:12:57.013"), "timestampMillisField": datetime("2024-12-04T20:27:59.083"), "timestampMicrosField": datetime("2024-12-04T20:27:59.083"), "localTimestampMillisField": datetime("2024-12-04T20:27:59.083"), "localTimestampMicrosField": datetime("2024-12-04T20:27:59.083") }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index b34dba9..da33b2e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -582,6 +582,12 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/avro/avro-logical-types">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/avro/avro-logical-types</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-nested-records">
<placeholder name="adapter" value="S3" />
<placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 35e0699..e131f8a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -98,6 +98,7 @@
UNSUPPORTED_COLUMN_TYPE(67),
INVALID_KEY_TYPE(68),
FAILED_TO_READ_KEY(69),
+ AVRO_SUPPORTED_TYPE_WITH_OPTION(70),
UNSUPPORTED_JRE(100),
EXTERNAL_UDF_RESULT_TYPE_ERROR(200),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index d15a751..16dcda5 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -104,6 +104,7 @@
67 = Type(s) '%1$s' are not supported in columnar storage format. Supported types are %2$s
68 = Invalid key type. Expected '%1$s', found '%2$s'.
69 = Failed to read key. Reason: %1$s.
+70 = Avro type '%1$s' is not supported by default. To enable type conversion, recreate the external dataset with the option '%2$s' enabled
100 = Unsupported JRE: %1$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
new file mode 100644
index 0000000..50a38eb
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableTime;
+import org.apache.asterix.om.base.AMutableUUID;
+import org.apache.asterix.om.base.ATime;
+import org.apache.asterix.om.base.AUUID;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+public class AvroConverterContext extends ParserContext {
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADate> dateSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+ private final ISerializerDeserializer<ATime> timeSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ATIME);
+ private final ISerializerDeserializer<ADouble> doubleSerDer =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ protected ISerializerDeserializer<AUUID> uuidSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AUUID);
+
+ private final boolean decimalToDouble;
+ private final boolean timestampAsLong;
+ private final boolean dateAsInt;
+ private final boolean timeAsLong;
+ private final boolean uuidAsString;
+
+ private final int timeZoneOffset;
+ private final AMutableDate mutableDate = new AMutableDate(0);
+ private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final AMutableDouble mutableDouble = new AMutableDouble(0.0);
+ private final AMutableTime mutableTime = new AMutableTime(0);
+ protected AMutableUUID aUUID = new AMutableUUID();
+ private final List<Warning> warnings;
+
+ public AvroConverterContext(Map<String, String> configuration, List<Warning> warnings) {
+ this.warnings = warnings;
+ decimalToDouble = Boolean.parseBoolean(configuration
+ .getOrDefault(ExternalDataConstants.AvroOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
+ timestampAsLong = Boolean.parseBoolean(configuration
+ .getOrDefault(ExternalDataConstants.AvroOptions.TIMESTAMP_AS_LONG, ExternalDataConstants.TRUE));
+ dateAsInt = Boolean.parseBoolean(
+ configuration.getOrDefault(ExternalDataConstants.AvroOptions.DATE_AS_INT, ExternalDataConstants.TRUE));
+ timeAsLong = Boolean.parseBoolean(
+ configuration.getOrDefault(ExternalDataConstants.AvroOptions.TIME_AS_LONG, ExternalDataConstants.TRUE));
+ uuidAsString = Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.AvroOptions.UUID_AS_STRING,
+ ExternalDataConstants.TRUE));
+ String configuredTimeZoneId = configuration.get(ExternalDataConstants.AvroOptions.TIMEZONE);
+ if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+ timeZoneOffset = TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+ } else {
+ timeZoneOffset = 0;
+ }
+ }
+
+ public void serializeDate(Object value, DataOutput output) {
+ try {
+ int intValue = (int) ((Number) value).longValue();
+ mutableDate.setValue(intValue);
+ dateSerDer.serialize(mutableDate, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDateTime(long timestamp, DataOutput output) {
+ try {
+ mutableDateTime.setValue(timestamp);
+ datetimeSerDer.serialize(mutableDateTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeTime(int value, DataOutput output) {
+ try {
+ mutableTime.setValue(value);
+ timeSerDer.serialize(mutableTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDecimal(Object value, DataOutput output, int scale) throws IOException {
+ if (value instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ BigInteger unscaledValue = new BigInteger(bytes);
+ BigDecimal bigDecimal = new BigDecimal(unscaledValue, scale);
+ serializeDouble(bigDecimal.doubleValue(), output);
+ } else {
+ throw new IOException(
+ "Expected ByteBuffer for Decimal logical type, but got: " + value.getClass().getName());
+ }
+ }
+
+ public void serializeDouble(double value, DataOutput output) {
+ try {
+ mutableDouble.setValue(value);
+ doubleSerDer.serialize(mutableDouble, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeUUID(Object value, DataOutput output) throws HyracksDataException {
+ String uuidValue = value.toString();
+ char[] buffer = uuidValue.toCharArray();
+ aUUID.parseUUIDString(buffer, 0, uuidValue.length());
+ uuidSerde.serialize(aUUID, output);
+ }
+
+ public boolean isDecimalToDoubleEnabled() {
+ return decimalToDouble;
+ }
+
+ public int getTimeZoneOffset() {
+ return timeZoneOffset;
+ }
+
+ public boolean isTimestampAsLong() {
+ return timestampAsLong;
+ }
+
+ public boolean isDateAsInt() {
+ return dateAsInt;
+ }
+
+ public boolean isUuidAsString() {
+ return uuidAsString;
+ }
+
+ public boolean isTimeAsLong() {
+ return timeAsLong;
+ }
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index c3563cc..faf6d72 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -24,9 +24,11 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.IAsterixListBuilder;
@@ -36,26 +38,31 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.stream.AvroConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
public class AvroDataParser extends AbstractDataParser implements IRecordDataParser<GenericRecord> {
- private final ParserContext parserContext;
+ private final AvroConverterContext parserContext;
private final IExternalFilterValueEmbedder valueEmbedder;
- public AvroDataParser(IExternalDataRuntimeContext context) {
- parserContext = new ParserContext();
+ public AvroDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
+ List<Warning> warnings = new ArrayList<>();
+ parserContext = new AvroConverterContext(conf, warnings);
valueEmbedder = context.getValueEmbedder();
}
@@ -192,6 +199,41 @@
private ATypeTag getTypeTag(Schema schema, Object value) throws HyracksDataException {
Schema.Type schemaType = schema.getType();
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType instanceof LogicalTypes.Uuid) {
+ if (parserContext.isUuidAsString()) {
+ return ATypeTag.STRING;
+ }
+ return ATypeTag.UUID;
+ }
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ ensureDecimalToDoubleEnabled(logicalType, parserContext);
+ return ATypeTag.DOUBLE;
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ if (parserContext.isDateAsInt()) {
+ return ATypeTag.INTEGER;
+ }
+ return ATypeTag.DATE;
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ if (parserContext.isTimeAsLong()) {
+ return ATypeTag.BIGINT;
+ }
+ return ATypeTag.TIME;
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ if (parserContext.isTimeAsLong()) {
+ return ATypeTag.BIGINT;
+ }
+ return ATypeTag.TIME;
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros
+ || logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.LocalTimestampMicros
+ || logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+ if (parserContext.isTimestampAsLong()) {
+ return ATypeTag.BIGINT;
+ }
+ return ATypeTag.DATETIME;
+ }
+
if (value == null) {
// The 'value' is missing
return ATypeTag.MISSING;
@@ -228,8 +270,74 @@
}
}
+ private void parseLogicalValue(LogicalType logicalType, Object value, DataOutput out) throws IOException {
+ if (logicalType instanceof LogicalTypes.Uuid) {
+ if (parserContext.isUuidAsString()) {
+ serializeString(value, out);
+ } else {
+ parserContext.serializeUUID(value, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+ int scale = decimalType.getScale();
+ parserContext.serializeDecimal(value, out, scale);
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ if (parserContext.isDateAsInt()) {
+ serializeLong(value, out);
+ } else {
+ parserContext.serializeDate(value, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ int timeInMillis = (int) TimeUnit.MICROSECONDS.toMillis(((Number) value).longValue());
+ int offset = parserContext.getTimeZoneOffset();
+ timeInMillis = timeInMillis + offset;
+ if (parserContext.isTimeAsLong()) {
+ serializeLong(timeInMillis, out);
+ } else {
+ parserContext.serializeTime(timeInMillis + offset, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ int timeInMillis = ((Number) value).intValue();
+ int offset = parserContext.getTimeZoneOffset();
+ timeInMillis = timeInMillis + offset;
+ if (parserContext.isTimeAsLong()) {
+ serializeLong(timeInMillis, out);
+ } else {
+ parserContext.serializeTime(timeInMillis, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros
+ || logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+ long timeStampInMicros = ((Number) value).longValue();
+ int offset = parserContext.getTimeZoneOffset();
+ long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(timeStampInMicros);
+ timeStampInMillis = timeStampInMillis + offset;
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+ long timeStampInMillis = ((Number) value).longValue();
+ int offset = parserContext.getTimeZoneOffset();
+ timeStampInMillis = timeStampInMillis + offset;
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis, out);
+ }
+ } else {
+ throw createUnsupportedException(logicalType.getName());
+ }
+ }
+
private void parseValue(Schema schema, Object value, DataOutput out) throws IOException {
Schema.Type type = schema.getType();
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ parseLogicalValue(logicalType, value, out);
+ return;
+ }
switch (type) {
case RECORD:
parseObject((GenericRecord) value, out);
@@ -247,6 +355,8 @@
nullSerde.serialize(ANull.NULL, out);
break;
case INT:
+ serializeInt(value, out);
+ break;
case LONG:
serializeLong(value, out);
break;
@@ -279,6 +389,12 @@
int64Serde.serialize(aInt64, out);
}
+ private void serializeInt(Object value, DataOutput out) throws HyracksDataException {
+ int intValue = ((Number) value).intValue();
+ aInt32.setValue(intValue);
+ int32Serde.serialize(aInt32, out);
+ }
+
private void serializeDouble(Object value, DataOutput out) throws HyracksDataException {
double doubleValue = ((Number) value).doubleValue();
aDouble.setValue(doubleValue);
@@ -290,7 +406,20 @@
stringSerde.serialize(aString, out);
}
+ private static void ensureDecimalToDoubleEnabled(LogicalType type, AvroConverterContext context)
+ throws RuntimeDataException {
+ if (!context.isDecimalToDoubleEnabled()) {
+ throw new RuntimeDataException(ErrorCode.AVRO_SUPPORTED_TYPE_WITH_OPTION, type.toString(),
+ ExternalDataConstants.AvroOptions.DECIMAL_TO_DOUBLE);
+ }
+ }
+
private static HyracksDataException createUnsupportedException(Schema schema) {
return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Avro Parser", schema);
}
+
+ private static HyracksDataException createUnsupportedException(String logicalType) {
+ return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Avro Parser, Invalid Logical Type: ", logicalType);
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
index 90fcc1a..c178401 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
@@ -59,7 +59,7 @@
}
private AvroDataParser createParser(IExternalDataRuntimeContext context) {
- return new AvroDataParser(context);
+ return new AvroDataParser(context, configuration);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 46a1b5b..4cc1656 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -403,6 +403,23 @@
WRITER_SUPPORTED_QUOTES = List.of(DEFAULT_QUOTE, DEFAULT_SINGLE_QUOTE, NONE);
}
+ public static class AvroOptions {
+ private AvroOptions() {
+ }
+
+ // - DECIMAL_TO_DOUBLE: Convert decimal to double (default: false)
+ // - UUID_AS_STRING: Convert UUID to string (default: true)
+ // - DATE_AS_INT: Convert date to integer (default: true)
+ // - TIME_AS_LONG: Convert time to long (default: true)
+ // - TIMESTAMP_AS_LONG: Convert timestamp to long (default: true)
+ public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+ public static final String UUID_AS_STRING = "uuid-to-string";
+ public static final String DATE_AS_INT = "date-to-int";
+ public static final String TIMEZONE = "timezone";
+ public static final String TIME_AS_LONG = "time-to-long";
+ public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+ }
+
public static class DeltaOptions {
private DeltaOptions() {
}