Fix continue ingestion bug when exception happens

1. Fix the bug when exception happens, localfs adapter couldn't pick up
   a new file to continue the ingestion.
2. Change the exception handling from string to error code.
3. Added a test case.
4. Removed some tailing . in error message.

Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1713
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/data/restaurants/malformed_record.adm b/asterixdb/asterix-app/data/restaurants/malformed_record.adm
new file mode 100644
index 0000000..33a8f8f
--- /dev/null
+++ b/asterixdb/asterix-app/data/restaurants/malformed_record.adm
@@ -0,0 +1 @@
+This is a test data file for record-reader-with-malformed-input-stream test case.
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.ddl.aql
new file mode 100644
index 0000000..747c858
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.ddl.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type RestaurantsType as open {
+    restr_id: int
+}
+
+create dataset Restaurants(RestaurantsType) primary key restr_id;
+
+create feed RFeed using localfs
+(("type-name"="RestaurantsType"),
+("path"="asterix_nc1://data/restaurants/"), /* This test case reply on restaurants.adm implicitly*/
+("format"="adm"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.2.update.aql
new file mode 100644
index 0000000..a281c27
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.2.update.aql
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed RFeed to dataset Restaurants;
+
+start feed RFeed;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.4.sleep.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.4.sleep.aql
new file mode 100644
index 0000000..6559ae8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.4.sleep.aql
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+2000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.5.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.5.update.aql
new file mode 100644
index 0000000..1301afe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.5.update.aql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+stop feed RFeed;
+disconnect feed RFeed from dataset Restaurants;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.6.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.6.query.aql
new file mode 100644
index 0000000..00d74a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.6.query.aql
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+
+count(for $x in dataset Restaurants
+return $x);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.8.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.8.ddl.aql
new file mode 100644
index 0000000..e3097c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.8.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.adm
new file mode 100644
index 0000000..8580e7b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/record-reader-with-malformed-input-stream/record-reader-with-malformed-input-stream.1.adm
@@ -0,0 +1 @@
+30
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index d2379c5..d28371d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -268,6 +268,11 @@
         <expected-error>This operation cannot be done when Feed</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="record-reader-with-malformed-input-stream">
+        <output-dir compare="Text">record-reader-with-malformed-input-stream</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="upsert">
     <test-case FilePath="upsert">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9de9dde..702cb0a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -117,7 +117,7 @@
     public static final int INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_NULL_IN_NON_OPTIONAL = 3018;
     public static final int INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_CANNT_GET_PKEY = 3019;
     public static final int FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED = 3020;
-    public static final int FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL = 3021;
+    public static final int RECORD_READER_MALFORMED_INPUT_STREAM = 3021;
     public static final int PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE = 3022;
     public static final int PROVIDER_DATASOURCE_FACTORY_UNKNOWN_INPUT_STREAM_FACTORY = 3023;
     public static final int UTIL_EXTERNAL_DATA_UTILS_FAIL_CREATE_STREAM_FACTORY = 3024;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index b6423f6..1387c6f 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -87,7 +87,7 @@
 3002 = Tuple is too large for a frame
 3003 = Unknown tuple forward policy
 3004 = Unable to create adapter as class loader not configured for library %1$s in dataverse %2$s
-3005 = At record: %1$s - Field %2$s is not privatean optional type so it cannot accept null value.
+3005 = At record: %1$s - Field %2$s is not privatean optional type so it cannot accept null value
 3006 = Illegal field %1$s in closed type %2$s
 3007 = Twitter4J library not found!
 3008 = Unable to ingest data
@@ -97,13 +97,13 @@
 3012 = Failed to get columns of record
 3013 = Can't deserialize Hive records with no closed columns
 3014 = Non-optional UNION type is not supported.
-3015 = Failed to get the type information for field %1$s.
+3015 = Failed to get the type information for field %1$s
 3016 = can't parse null field
 3017 = can't parse hive list with null values
-3018 = Field %1$s of meta record is not an optional type so it cannot accept null value.
+3018 = Field %1$s of meta record is not an optional type so it cannot accept null value
 3019 = Can't get PK from record part
-3020 = This operation cannot be done when Feed %1$s is alive.
-3021 = Could not register feed intake job [%1$s] for feed  %2$s
+3020 = This operation cannot be done when Feed %1$s is alive
+3021 = Malformed input stream
 3022 = Unknown data source type: %1$s
 3023 = Unknown input stream factory: %1$s
 3024 = Failed to create stream factory
@@ -123,44 +123,44 @@
 3039 = Cannot parse list item of type %1$s
 3040 = Argument type: %1$s
 3041 = Unable to load/instantiate class %1$s
-3042 = UDF of kind %1$s not supported.
+3042 = UDF of kind %1$s not supported
 3043 = Unknown function kind %1$s
 3044 = Library class loader already registered!
 3045 = Cannot handle a function argument of type %1$s
-3046 = Object of type %1$s not supported.
+3046 = Object of type %1$s not supported
 3047 = External %1$s not supported
 3048 = Invalid feed runtime: %1$s
-3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1.
-3050 = '%1$s' is not a valid quote. The length of a quote should be 1.
-3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'.
+3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1
+3050 = '%1$s' is not a valid quote. The length of a quote should be 1
+3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'
 3052 = Was not able to find a file in the files index
 3053 = Field %1$s can not be null
 3054 = Mismatch Type, expecting a value of type %1$s
-3055 = Unexpected ADM token kind: %1$s.
+3055 = Unexpected ADM token kind: %1$s
 3056 = Illegal escape '\%1$s'
 3057 = Found END_RECORD while expecting a record field.
 3058 = This record is closed, you can not add extra fields! new field name: %1$s
-3059 = Unexpected ADM token kind: %1$s while expecting ":".
-3060 = Found COMMA %1$s %2$s record field.
-3061 = Unsupported interval type: %1$s.
-3062 = Interval was not closed.
+3059 = Unexpected ADM token kind: %1$s while expecting ":"
+3060 = Found COMMA %1$s %2$s record field
+3061 = Unsupported interval type: %1$s
+3062 = Interval was not closed
 3063 = The interval start and end point types do not match: %1$s != %2$s
-3064 = Missing COMMA before interval end point.
-3065 = This can not be an instance of interval: missing T for a datetime value.
-3066 = Unsupported interval type: %1$s.
-3067 = Interval argument not properly constructed.
-3068 = Found END_COLLECTION while expecting a list item.
-3069 = Found COMMA before any list item.
-3070 = Found COMMA while expecting a list item.
-3071 = Found END_RECORD while expecting a list item.
-3072 = Can't cast the %1$s type to the %2$s type.
-3073 = Missing deserializer method for constructor: %1$s.
+3064 = Missing COMMA before interval end point
+3065 = This can not be an instance of interval: missing T for a datetime value
+3066 = Unsupported interval type: %1$s
+3067 = Interval argument not properly constructed
+3068 = Found END_COLLECTION while expecting a list item
+3069 = Found COMMA before any list item
+3070 = Found COMMA while expecting a list item
+3071 = Found END_RECORD while expecting a list item
+3072 = Can't cast the %1$s type to the %2$s type
+3073 = Missing deserializer method for constructor: %1$s
 3074 = This can not be an instance of %1$s
-3075 = Closed field %1$s has null value.
+3075 = Closed field %1$s has null value
 3076 = %1$s: no files found
 3077 = %1$s: path not found
 3078 = Cannot obtain hdfs scheduler
 3079 = Cannot register runtime, active manager has been shutdown
 3080 = Unexpected feed datatype '%1$s'
-3081 = socket is not properly configured.
+3081 = socket is not properly configured
 3082 = "Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s"
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 1b12dc1..d01859e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -177,7 +177,7 @@
         if (!recordReader.handleException(th)) {
             finish();
         }
-        return closed.get();
+        return !closed.get();
     }
 
     public IRecordReader<T> getReader() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 4d6d004..7614e6eb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -101,7 +103,7 @@
                         // corrupted file. clear the buffer and stop reading
                         reader.reset();
                         bufferPosn = bufferLength = 0;
-                        throw new IOException("Malformed input stream");
+                        throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
                     }
                 }
             }
@@ -141,7 +143,7 @@
                 } catch (IOException e) {
                     reader.reset();
                     bufferPosn = bufferLength = 0;
-                    throw new IOException("Malformed input stream");
+                    throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
                 }
             }
         } while (!hasFinished);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 3c3b8fb..00ac090 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -22,6 +22,8 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -153,23 +155,21 @@
         if (in == null) {
             return false;
         }
-        if (th instanceof IOException) {
-            // TODO: Change from string check to exception type
-            if (th.getCause().getMessage().contains("Malformed input stream")) {
-                if (currentFile != null) {
-                    try {
-                        logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
-                    } catch (IOException e) {
-                        LOGGER.warn("Filed to write to feed log file", e);
-                    }
-                    LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
-                }
+        if (th instanceof HyracksDataException
+                && ((HyracksDataException) th).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
+            if (currentFile != null) {
                 try {
-                    advance();
-                    return true;
-                } catch (Exception e) {
-                    LOGGER.warn("An exception was thrown while trying to skip a file", e);
+                    logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
+                } catch (IOException e) {
+                    LOGGER.warn("Filed to write to feed log file", e);
                 }
+                LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
+            }
+            try {
+                advance();
+                return true;
+            } catch (Exception e) {
+                LOGGER.warn("An exception was thrown while trying to skip a file", e);
             }
         }
         LOGGER.warn("Failed to recover from failure", th);