Fix continue ingestion bug when exception happens
1. Fix the bug when exception happens, localfs adapter couldn't pick up
a new file to continue the ingestion.
2. Change the exception handling from string to error code.
3. Added a test case.
4. Removed some tailing . in error message.
Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1713
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/data/restaurants/malformed_record.adm b/asterixdb/asterix-app/data/restaurants/malformed_record.adm
new file mode 100644
index 0000000..33a8f8f
--- /dev/null
+++ b/asterixdb/asterix-app/data/restaurants/malformed_record.adm
@@ -0,0 +1 @@
+This is a test data file for record-reader-with-malformed-input-stream test case.
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.ddl.aql
new file mode 100644
index 0000000..747c858
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.ddl.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type RestaurantsType as open {
+ restr_id: int
+}
+
+create dataset Restaurants(RestaurantsType) primary key restr_id;
+
+create feed RFeed using localfs
+(("type-name"="RestaurantsType"),
+("path"="asterix_nc1://data/restaurants/"), /* This test case reply on restaurants.adm implicitly*/
+("format"="adm"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.2.update.aql
new file mode 100644
index 0000000..a281c27
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.2.update.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed RFeed to dataset Restaurants;
+
+start feed RFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.4.sleep.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.4.sleep.aql
new file mode 100644
index 0000000..6559ae8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.4.sleep.aql
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+2000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.5.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.5.update.aql
new file mode 100644
index 0000000..1301afe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.5.update.aql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+stop feed RFeed;
+disconnect feed RFeed from dataset Restaurants;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.6.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.6.query.aql
new file mode 100644
index 0000000..00d74a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.6.query.aql
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+
+count(for $x in dataset Restaurants
+return $x);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.8.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.8.ddl.aql
new file mode 100644
index 0000000..e3097c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.8.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.adm
new file mode 100644
index 0000000..8580e7b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.adm
@@ -0,0 +1 @@
+30
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index d2379c5..d28371d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -268,6 +268,11 @@
<expected-error>This operation cannot be done when Feed</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="record-reader-with-malformed-input-stream">
+ <output-dir compare="Text">record-reader-with-malformed-input-stream</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="upsert">
<test-case FilePath="upsert">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9de9dde..702cb0a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -117,7 +117,7 @@
public static final int INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_NULL_IN_NON_OPTIONAL = 3018;
public static final int INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_CANNT_GET_PKEY = 3019;
public static final int FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED = 3020;
- public static final int FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL = 3021;
+ public static final int RECORD_READER_MALFORMED_INPUT_STREAM = 3021;
public static final int PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE = 3022;
public static final int PROVIDER_DATASOURCE_FACTORY_UNKNOWN_INPUT_STREAM_FACTORY = 3023;
public static final int UTIL_EXTERNAL_DATA_UTILS_FAIL_CREATE_STREAM_FACTORY = 3024;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index b6423f6..1387c6f 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -87,7 +87,7 @@
3002 = Tuple is too large for a frame
3003 = Unknown tuple forward policy
3004 = Unable to create adapter as class loader not configured for library %1$s in dataverse %2$s
-3005 = At record: %1$s - Field %2$s is not privatean optional type so it cannot accept null value.
+3005 = At record: %1$s - Field %2$s is not privatean optional type so it cannot accept null value
3006 = Illegal field %1$s in closed type %2$s
3007 = Twitter4J library not found!
3008 = Unable to ingest data
@@ -97,13 +97,13 @@
3012 = Failed to get columns of record
3013 = Can't deserialize Hive records with no closed columns
3014 = Non-optional UNION type is not supported.
-3015 = Failed to get the type information for field %1$s.
+3015 = Failed to get the type information for field %1$s
3016 = can't parse null field
3017 = can't parse hive list with null values
-3018 = Field %1$s of meta record is not an optional type so it cannot accept null value.
+3018 = Field %1$s of meta record is not an optional type so it cannot accept null value
3019 = Can't get PK from record part
-3020 = This operation cannot be done when Feed %1$s is alive.
-3021 = Could not register feed intake job [%1$s] for feed %2$s
+3020 = This operation cannot be done when Feed %1$s is alive
+3021 = Malformed input stream
3022 = Unknown data source type: %1$s
3023 = Unknown input stream factory: %1$s
3024 = Failed to create stream factory
@@ -123,44 +123,44 @@
3039 = Cannot parse list item of type %1$s
3040 = Argument type: %1$s
3041 = Unable to load/instantiate class %1$s
-3042 = UDF of kind %1$s not supported.
+3042 = UDF of kind %1$s not supported
3043 = Unknown function kind %1$s
3044 = Library class loader already registered!
3045 = Cannot handle a function argument of type %1$s
-3046 = Object of type %1$s not supported.
+3046 = Object of type %1$s not supported
3047 = External %1$s not supported
3048 = Invalid feed runtime: %1$s
-3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1.
-3050 = '%1$s' is not a valid quote. The length of a quote should be 1.
-3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'.
+3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1
+3050 = '%1$s' is not a valid quote. The length of a quote should be 1
+3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'
3052 = Was not able to find a file in the files index
3053 = Field %1$s can not be null
3054 = Mismatch Type, expecting a value of type %1$s
-3055 = Unexpected ADM token kind: %1$s.
+3055 = Unexpected ADM token kind: %1$s
3056 = Illegal escape '\%1$s'
3057 = Found END_RECORD while expecting a record field.
3058 = This record is closed, you can not add extra fields! new field name: %1$s
-3059 = Unexpected ADM token kind: %1$s while expecting ":".
-3060 = Found COMMA %1$s %2$s record field.
-3061 = Unsupported interval type: %1$s.
-3062 = Interval was not closed.
+3059 = Unexpected ADM token kind: %1$s while expecting ":"
+3060 = Found COMMA %1$s %2$s record field
+3061 = Unsupported interval type: %1$s
+3062 = Interval was not closed
3063 = The interval start and end point types do not match: %1$s != %2$s
-3064 = Missing COMMA before interval end point.
-3065 = This can not be an instance of interval: missing T for a datetime value.
-3066 = Unsupported interval type: %1$s.
-3067 = Interval argument not properly constructed.
-3068 = Found END_COLLECTION while expecting a list item.
-3069 = Found COMMA before any list item.
-3070 = Found COMMA while expecting a list item.
-3071 = Found END_RECORD while expecting a list item.
-3072 = Can't cast the %1$s type to the %2$s type.
-3073 = Missing deserializer method for constructor: %1$s.
+3064 = Missing COMMA before interval end point
+3065 = This can not be an instance of interval: missing T for a datetime value
+3066 = Unsupported interval type: %1$s
+3067 = Interval argument not properly constructed
+3068 = Found END_COLLECTION while expecting a list item
+3069 = Found COMMA before any list item
+3070 = Found COMMA while expecting a list item
+3071 = Found END_RECORD while expecting a list item
+3072 = Can't cast the %1$s type to the %2$s type
+3073 = Missing deserializer method for constructor: %1$s
3074 = This can not be an instance of %1$s
-3075 = Closed field %1$s has null value.
+3075 = Closed field %1$s has null value
3076 = %1$s: no files found
3077 = %1$s: path not found
3078 = Cannot obtain hdfs scheduler
3079 = Cannot register runtime, active manager has been shutdown
3080 = Unexpected feed datatype '%1$s'
-3081 = socket is not properly configured.
+3081 = socket is not properly configured
3082 = "Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s"
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 1b12dc1..d01859e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -177,7 +177,7 @@
if (!recordReader.handleException(th)) {
finish();
}
- return closed.get();
+ return !closed.get();
}
public IRecordReader<T> getReader() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 4d6d004..7614e6eb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,7 +20,9 @@
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -101,7 +103,7 @@
// corrupted file. clear the buffer and stop reading
reader.reset();
bufferPosn = bufferLength = 0;
- throw new IOException("Malformed input stream");
+ throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
}
}
}
@@ -141,7 +143,7 @@
} catch (IOException e) {
reader.reset();
bufferPosn = bufferLength = 0;
- throw new IOException("Malformed input stream");
+ throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
}
}
} while (!hasFinished);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 3c3b8fb..00ac090 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -22,6 +22,8 @@
import java.io.FileInputStream;
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -153,23 +155,21 @@
if (in == null) {
return false;
}
- if (th instanceof IOException) {
- // TODO: Change from string check to exception type
- if (th.getCause().getMessage().contains("Malformed input stream")) {
- if (currentFile != null) {
- try {
- logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
- } catch (IOException e) {
- LOGGER.warn("Filed to write to feed log file", e);
- }
- LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
- }
+ if (th instanceof HyracksDataException
+ && ((HyracksDataException) th).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
+ if (currentFile != null) {
try {
- advance();
- return true;
- } catch (Exception e) {
- LOGGER.warn("An exception was thrown while trying to skip a file", e);
+ logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
+ } catch (IOException e) {
+ LOGGER.warn("Filed to write to feed log file", e);
}
+ LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
+ }
+ try {
+ advance();
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("An exception was thrown while trying to skip a file", e);
}
}
LOGGER.warn("Failed to recover from failure", th);