[ASTERIXDB-3040][EXT]: Handle invalid Parquet file error

Change-Id: Ideeafac977722cabc79c26b4fe3c7e1ebb540a81
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16563
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.000.ddl.sqlpp
new file mode 100644
index 0000000..ca5868c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.000.ddl.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+  %template%,
+  ("container"="playground"),
+  ("definition"="json-data/reviews/single-line/json"),
+  ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.001.query.sqlpp
new file mode 100644
index 0000000..a178663
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.001.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE COUNT(*)
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.999.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.999.ddl.sqlpp
new file mode 100644
index 0000000..20dc6fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.999.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 12a8ae2..7242984 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -172,6 +172,14 @@
         <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/parquet/invalid-parquet-files">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">none</output-dir>
+        <source-location>false</source-location>
+        <expected-error>20-records.json. Reason: not a Parquet file</expected-error>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="external-dataset" check-warnings="true">
       <compilation-unit name="common/parquet/parquet-types/unset-flags">
         <placeholder name="adapter" value="S3" />
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 068c125..fc234ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -85,6 +85,8 @@
     PARQUET_DECIMAL_TO_DOUBLE_PRECISION_LOSS(55),
     PARQUET_TIME_ZONE_ID_IS_NOT_SET(56),
     PARQUET_CONTAINS_OVERFLOWED_BIGINT(57),
+    UNEXPECTED_ERROR_ENCOUNTERED(58),
+    INVALID_PARQUET_FILE(59),
 
     UNSUPPORTED_JRE(100),
 
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index f5ef79d..b98dc43 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -92,6 +92,8 @@
 55 = Parquet decimal precision loss: precision '%1$s' is greater than the maximum supported precision '%2$s'
 56 = Parquet file(s) contain values of the temporal type '%1$s' that are adjusted to UTC. Recreate the external dataset and set the option '%2$s' to get the local-adjusted '%1$s' value
 57 = Parquet file(s) contain unsigned integer that is larger than the '%1$s' range
+58 = Error encountered: %1$s
+59 = Invalid Parquet file: %1$s. Reason: %2$s
 
 100 = Unsupported JRE: %1$s
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 328e09d..d3ad968 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -25,14 +25,18 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.LogRedactionUtil;
 import org.apache.parquet.hadoop.Footer;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.ParquetInputSplit;
@@ -103,8 +107,8 @@
                 } else if (oldSplit instanceof FileSplit) {
                     realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
                 } else {
-                    throw new IllegalArgumentException(
-                            "Invalid split (not a FileSplit or ParquetInputSplitWrapper): " + oldSplit);
+                    throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+                            LogRedactionUtil.userData(oldSplit.toString()), "invalid file split");
                 }
                 valueContainer = new VoidPointable();
                 firstRecord = false;
@@ -119,6 +123,26 @@
                 }
             } catch (InterruptedException e) {
                 throw new IOException(e);
+            } catch (HyracksDataException | AsterixParquetRuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                if (e.getMessage() != null && e.getMessage().contains("not a Parquet file")) {
+                    throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+                            LogRedactionUtil.userData(getPath(oldSplit)), "not a Parquet file");
+                }
+
+                throw RuntimeDataException.create(ErrorCode.UNEXPECTED_ERROR_ENCOUNTERED,
+                        LogRedactionUtil.userData(e.toString()));
+            }
+        }
+
+        private String getPath(InputSplit split) {
+            if (split instanceof FileSplit) {
+                return ((FileSplit) split).getPath().toString();
+            } else if (split instanceof ParquetInputSplitWrapper) {
+                return ((ParquetInputSplitWrapper) split).realSplit.getPath().toString();
+            } else {
+                return split.toString();
             }
         }
 
@@ -215,5 +239,10 @@
         public void write(DataOutput out) throws IOException {
             realSplit.write(out);
         }
+
+        @Override
+        public String toString() {
+            return realSplit.toString();
+        }
     }
 }