[NO ISSUE][EXT] Fix error reporting when processing external datasets records
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Fix error reporting when the record size retrieved from external datasets
exceeds the limit instead of reporting internal error.
Change-Id: I0f0973356c727d7d7a28252163aba2591c4205db
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6083
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index aa4abb4..5d7ffb2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -65,7 +65,7 @@
size = length;
}
- private void ensureCapacity(int len) throws IOException {
+ private void ensureCapacity(int len) throws RuntimeDataException {
if (value.length < len) {
if (len > ExternalDataConstants.MAX_RECORD_SIZE) {
throw new RuntimeDataException(ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE,
@@ -77,7 +77,7 @@
}
}
- public void append(char[] recordBuffer, int offset, int length) throws IOException {
+ public void append(char[] recordBuffer, int offset, int length) throws RuntimeDataException {
ensureCapacity(size + length);
System.arraycopy(recordBuffer, offset, value, size, length);
size += length;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 38eec98..fa4a4a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -152,10 +152,10 @@
if (appendLength > 0) {
try {
record.append(inputBuffer, startPosn, appendLength);
- } catch (IOException e) {
+ } catch (RuntimeDataException e) {
reader.reset();
bufferPosn = bufferLength = 0;
- throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
+ throw e;
}
}
} while (!hasFinished);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 3d23b24..4fec3c4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -121,21 +121,31 @@
return false;
}
Throwable root = ExceptionUtils.getRootCause(th);
- if (root instanceof HyracksDataException
- && ((HyracksDataException) root).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
- if (currentFile != null) {
- try {
- logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
- } catch (IOException e) {
- LOGGER.log(Level.WARN, "Filed to write to feed log file", e);
+ if (root instanceof HyracksDataException) {
+ HyracksDataException r = (HyracksDataException) root;
+ String component = r.getComponent();
+ if (ErrorCode.ASTERIX.equals(component)) {
+ int errorCode = r.getErrorCode();
+ switch (errorCode) {
+ case ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM:
+ if (currentFile != null) {
+ try {
+ logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
+ } catch (IOException e) {
+ LOGGER.log(Level.WARN, "Filed to write to feed log file", e);
+ }
+ LOGGER.log(Level.WARN, "Corrupted input file: " + currentFile.getAbsolutePath());
+ }
+ case ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE:
+ try {
+ advance();
+ return true;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e);
+ }
+ default:
+ break;
}
- LOGGER.log(Level.WARN, "Corrupted input file: " + currentFile.getAbsolutePath());
- }
- try {
- advance();
- return true;
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e);
}
}
LOGGER.log(Level.WARN, "Failed to recover from failure", th);