[ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes fix, Change in Map implementation.
Change-Id: I584873a47bf409351d6b63979117616bce415c8f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
index d62d2d1..60f4b83 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
@@ -31,11 +31,12 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;
public class AvroFileExampleGeneratorUtil {
private static final String SCHEMA_STRING = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"SimpleRecord\",\n"
+ " \"namespace\": \"com.example\",\n" + " \"fields\": [\n" + " {\n"
- + " \"name\": \"unionField\",\n" + " \"type\": [\"int\", \"string\"],\n"
+ + " \"name\": \"unionField\",\n" + " \"type\": [\"int\", \"string\", \"bytes\"],\n"
+ " \"doc\": \"This field can be either an int or a string.\"\n" + " },\n" + " {\n"
+ " \"name\": \"mapField\",\n" + " \"type\": {\n" + " \"type\": \"map\",\n"
+ " \"values\": \"int\",\n" + " \"doc\": \"This is a map of string keys to int values.\"\n"
@@ -96,7 +97,7 @@
//second record to be added
GenericRecord record2 = new GenericData.Record(schema);
- record2.put("unionField", "Example string");
+ record2.put("unionField", ByteBuffer.wrap(new byte[] { 0x01, 0x05 }));
Map<String, Integer> map2 = new HashMap<>();
map2.put("key3", 3);
map2.put("key4", 4);
@@ -115,4 +116,9 @@
e.printStackTrace();
}
}
+
+ @Test
+ public void main() throws IOException {
+ writeExample();
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
index b97d9f8..c2ca9ee 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
@@ -24,6 +24,6 @@
USE test;
-SELECT RAW a.mapField.key1
+SELECT RAW a.mapField[0]
FROM AvroDataset a
ORDER BY a.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
index 5560bf9..92d5ea1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
@@ -1,2 +1,2 @@
-{ "key1": 1, "key2": 2 }
-{ "key3": 3, "key4": 4 }
\ No newline at end of file
+[ { "key": "key1", "value": 1 }, { "key": "key2", "value": 2 } ]
+[ { "key": "key3", "value": 3 }, { "key": "key4", "value": 4 } ]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
index fe0b81f..73b283a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
@@ -1,2 +1,2 @@
-1
-null
\ No newline at end of file
+{ "key": "key1", "value": 1 }
+{ "key": "key3", "value": 3 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
index 15f8776..8fd0212 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
@@ -1,2 +1,2 @@
42
-"Example string"
\ No newline at end of file
+hex("0105")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 57474b3..1e90f98 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -365,7 +365,7 @@
<placeholder name="adapter" value="S3" />
<output-dir compare="Text">none</output-dir>
<source-location>false</source-location>
- <expected-error>Not an Avro data file.</expected-error>
+ <expected-error>Malformed input stream</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
index e048890..3f92f00 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
@@ -28,6 +28,8 @@
import java.util.Map;
import java.util.function.Supplier;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -35,6 +37,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -133,12 +136,15 @@
}
private boolean advance() throws IOException {
- if (inputStream.advance()) {
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
- dataFileStream = new DataFileStream<>(inputStream, datumReader);
- return true;
+ try {
+ if (inputStream.advance()) {
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+ dataFileStream = new DataFileStream<>(inputStream, datumReader);
+ return true;
+ }
+ } catch (InvalidAvroMagicException e) {
+ throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
}
-
return false;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 985e2b5..1ebe982 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -132,11 +132,9 @@
streamRecordReader.configure(context.getTaskContext(), streamFactory.createInputStream(context),
configuration);
return streamRecordReader;
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException
- | NoSuchMethodException e) {
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException
+ | IOException e) {
throw HyracksDataException.create(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index d760c1f..6ee74d7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -91,19 +91,26 @@
}
private void parseMap(Schema mapSchema, Map<String, ?> map, DataOutput out) throws IOException {
- Schema valueSchema = mapSchema.getValueType();
- final IMutableValueStorage valueBuffer = parserContext.enterCollection();
- final IMutableValueStorage keyBuffer = parserContext.enterCollection();
+ final IMutableValueStorage item = parserContext.enterCollection();
+ final IMutableValueStorage valueBuffer = parserContext.enterObject();
IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ IAsterixListBuilder listBuilder =
+ parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
for (Map.Entry<String, ?> entry : map.entrySet()) {
- keyBuffer.reset();
+ objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
valueBuffer.reset();
- serializeString(entry.getKey(), Schema.Type.STRING, keyBuffer.getDataOutput());
- parseValue(valueSchema, entry.getValue(), valueBuffer.getDataOutput());
- objectBuilder.addField(keyBuffer, valueBuffer);
+ serializeString(entry.getKey(), Schema.Type.STRING, valueBuffer.getDataOutput());
+ objectBuilder.addField(parserContext.getSerializedFieldName("key"), valueBuffer);
+ valueBuffer.reset();
+ parseValue(mapSchema.getValueType(), entry.getValue(), valueBuffer.getDataOutput());
+ objectBuilder.addField(parserContext.getSerializedFieldName("value"), valueBuffer);
+ item.reset();
+ objectBuilder.write(item.getDataOutput(), true);
+ listBuilder.addItem(item);
}
- objectBuilder.write(out, true);
+ listBuilder.write(out, true);
parserContext.exitObject(valueBuffer, null, objectBuilder);
+ parserContext.exitCollection(item, listBuilder);
}
private final void parseUnion(Schema unionSchema, Object value, DataOutput out) throws IOException {
@@ -134,7 +141,7 @@
case BOOLEAN:
return value instanceof Boolean;
case BYTES:
- return value instanceof Byte;
+ return value instanceof ByteBuffer;
case RECORD:
return value instanceof GenericData.Record;
default: