[ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes fix, Change in Map implementation.

Change-Id: I584873a47bf409351d6b63979117616bce415c8f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
index d62d2d1..60f4b83 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
@@ -31,11 +31,12 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;
 
 public class AvroFileExampleGeneratorUtil {
     private static final String SCHEMA_STRING = "{\n" + "  \"type\": \"record\",\n" + "  \"name\": \"SimpleRecord\",\n"
             + "  \"namespace\": \"com.example\",\n" + "  \"fields\": [\n" + "    {\n"
-            + "      \"name\": \"unionField\",\n" + "      \"type\": [\"int\", \"string\"],\n"
+            + "      \"name\": \"unionField\",\n" + "      \"type\": [\"int\", \"string\", \"bytes\"],\n"
             + "      \"doc\": \"This field can be either an int or a string.\"\n" + "    },\n" + "    {\n"
             + "      \"name\": \"mapField\",\n" + "      \"type\": {\n" + "        \"type\": \"map\",\n"
             + "        \"values\": \"int\",\n" + "        \"doc\": \"This is a map of string keys to int values.\"\n"
@@ -96,7 +97,7 @@
 
             //second record to be added
             GenericRecord record2 = new GenericData.Record(schema);
-            record2.put("unionField", "Example string");
+            record2.put("unionField", ByteBuffer.wrap(new byte[] { 0x01, 0x05 }));
             Map<String, Integer> map2 = new HashMap<>();
             map2.put("key3", 3);
             map2.put("key4", 4);
@@ -115,4 +116,9 @@
             e.printStackTrace();
         }
     }
+
+    @Test
+    public void main() throws IOException {
+        writeExample();
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
index b97d9f8..c2ca9ee 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
@@ -24,6 +24,6 @@
 
 USE test;
 
-SELECT RAW a.mapField.key1
+SELECT RAW a.mapField[0]
 FROM AvroDataset a
 ORDER BY a.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
index 5560bf9..92d5ea1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
@@ -1,2 +1,2 @@
-{ "key1": 1, "key2": 2 }
-{ "key3": 3, "key4": 4 }
\ No newline at end of file
+[ { "key": "key1", "value": 1 }, { "key": "key2", "value": 2 } ]
+[ { "key": "key3", "value": 3 }, { "key": "key4", "value": 4 } ]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
index fe0b81f..73b283a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
@@ -1,2 +1,2 @@
-1
-null
\ No newline at end of file
+{ "key": "key1", "value": 1 }
+{ "key": "key3", "value": 3 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
index 15f8776..8fd0212 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
@@ -1,2 +1,2 @@
 42
-"Example string"
\ No newline at end of file
+hex("0105")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 57474b3..1e90f98 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -365,7 +365,7 @@
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">none</output-dir>
         <source-location>false</source-location>
-        <expected-error>Not an Avro data file.</expected-error>
+        <expected-error>Malformed input stream</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
index e048890..3f92f00 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
@@ -28,6 +28,8 @@
 import java.util.Map;
 import java.util.function.Supplier;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -35,6 +37,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.avro.InvalidAvroMagicException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -133,12 +136,15 @@
     }
 
     private boolean advance() throws IOException {
-        if (inputStream.advance()) {
-            DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-            dataFileStream = new DataFileStream<>(inputStream, datumReader);
-            return true;
+        try {
+            if (inputStream.advance()) {
+                DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+                dataFileStream = new DataFileStream<>(inputStream, datumReader);
+                return true;
+            }
+        } catch (InvalidAvroMagicException e) {
+            throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
         }
-
         return false;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 985e2b5..1ebe982 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -132,11 +132,9 @@
             streamRecordReader.configure(context.getTaskContext(), streamFactory.createInputStream(context),
                     configuration);
             return streamRecordReader;
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
-                | NoSuchMethodException e) {
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException
+                | IOException e) {
             throw HyracksDataException.create(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
         }
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index d760c1f..6ee74d7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -91,19 +91,26 @@
     }
 
     private void parseMap(Schema mapSchema, Map<String, ?> map, DataOutput out) throws IOException {
-        Schema valueSchema = mapSchema.getValueType();
-        final IMutableValueStorage valueBuffer = parserContext.enterCollection();
-        final IMutableValueStorage keyBuffer = parserContext.enterCollection();
+        final IMutableValueStorage item = parserContext.enterCollection();
+        final IMutableValueStorage valueBuffer = parserContext.enterObject();
         IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        IAsterixListBuilder listBuilder =
+                parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
         for (Map.Entry<String, ?> entry : map.entrySet()) {
-            keyBuffer.reset();
+            objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
             valueBuffer.reset();
-            serializeString(entry.getKey(), Schema.Type.STRING, keyBuffer.getDataOutput());
-            parseValue(valueSchema, entry.getValue(), valueBuffer.getDataOutput());
-            objectBuilder.addField(keyBuffer, valueBuffer);
+            serializeString(entry.getKey(), Schema.Type.STRING, valueBuffer.getDataOutput());
+            objectBuilder.addField(parserContext.getSerializedFieldName("key"), valueBuffer);
+            valueBuffer.reset();
+            parseValue(mapSchema.getValueType(), entry.getValue(), valueBuffer.getDataOutput());
+            objectBuilder.addField(parserContext.getSerializedFieldName("value"), valueBuffer);
+            item.reset();
+            objectBuilder.write(item.getDataOutput(), true);
+            listBuilder.addItem(item);
         }
-        objectBuilder.write(out, true);
+        listBuilder.write(out, true);
         parserContext.exitObject(valueBuffer, null, objectBuilder);
+        parserContext.exitCollection(item, listBuilder);
     }
 
     private final void parseUnion(Schema unionSchema, Object value, DataOutput out) throws IOException {
@@ -134,7 +141,7 @@
             case BOOLEAN:
                 return value instanceof Boolean;
             case BYTES:
-                return value instanceof Byte;
+                return value instanceof ByteBuffer;
             case RECORD:
                 return value instanceof GenericData.Record;
             default: