Merge "Merge branch 'gerrit/trinity' into 'master'"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 54c32ef..5df65ba 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -371,7 +371,7 @@
       <compilation-unit name="common/avro/invalid-avro-files">
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">none</output-dir>
-        <expected-error>Malformed input stream</expected-error>
+        <expected-error>External source error. org.apache.avro.InvalidAvroMagicException: Not an Avro data file.</expected-error>
         <source-location>false</source-location>
       </compilation-unit>
     </test-case>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
index 3f92f00..1d76a90 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
@@ -20,6 +20,7 @@
 
 import static org.apache.asterix.external.util.ExternalDataConstants.EMPTY_STRING;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_REDACT_WARNINGS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -37,7 +38,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -55,7 +56,7 @@
     private static final List<String> recordReaderFormats =
             Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_AVRO));
 
-    public AvroRecordReader(AsterixInputStream inputStream, Map<String, String> config) throws IOException {
+    public AvroRecordReader(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
         record = new org.apache.asterix.external.input.record.GenericRecord<>();
         this.inputStream = new DiscretizedMultipleInputStream(inputStream);
         done = false;
@@ -91,21 +92,29 @@
     }
 
     @Override
-    public IRawRecord<GenericRecord> next() throws IOException {
-        avroRecord = dataFileStream.next(avroRecord);
-        record.set(avroRecord);
-        return record;
+    public IRawRecord<GenericRecord> next() throws HyracksDataException {
+        try {
+            avroRecord = dataFileStream.next(avroRecord);
+            record.set(avroRecord);
+            return record;
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
     }
 
     @Override
-    public boolean hasNext() throws IOException {
-        if (dataFileStream == null) {
-            return false;
+    public boolean hasNext() throws HyracksDataException {
+        try {
+            if (dataFileStream == null) {
+                return false;
+            }
+            if (dataFileStream.hasNext()) {
+                return true;
+            }
+            return advance() && dataFileStream.hasNext();
+        } catch (AvroRuntimeException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
         }
-        if (dataFileStream.hasNext()) {
-            return true;
-        }
-        return advance() && dataFileStream.hasNext();
     }
 
     @Override
@@ -126,7 +135,6 @@
     @Override
     public List<String> getRecordReaderFormats() {
         return recordReaderFormats;
-
     }
 
     @Override
@@ -135,17 +143,16 @@
 
     }
 
-    private boolean advance() throws IOException {
+    private boolean advance() throws HyracksDataException {
         try {
             if (inputStream.advance()) {
                 DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
                 dataFileStream = new DataFileStream<>(inputStream, datumReader);
                 return true;
             }
-        } catch (InvalidAvroMagicException e) {
-            throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
         }
         return false;
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index 49b9e74..8088daa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.parser;
 
 import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -40,6 +41,7 @@
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -62,8 +64,8 @@
             parseObject(record.get(), out);
             valueEmbedder.reset();
             return true;
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
         }
     }
 
@@ -278,12 +280,12 @@
         doubleSerde.serialize(aDouble, out);
     }
 
-    private void serializeString(Object value, DataOutput out) throws IOException {
+    private void serializeString(Object value, DataOutput out) throws HyracksDataException {
         aString.setValue(value.toString());
         stringSerde.serialize(aString, out);
     }
 
-    private static HyracksDataException createUnsupportedException(Schema schema) throws HyracksDataException {
+    private static HyracksDataException createUnsupportedException(Schema schema) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Avro Parser", schema);
     }
 }