[ASTERIXDB-3040][EXT]: Handle invalid Parquet file error
Change-Id: Ideeafac977722cabc79c26b4fe3c7e1ebb540a81
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16563
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.000.ddl.sqlpp
new file mode 100644
index 0000000..ca5868c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.000.ddl.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="json-data/reviews/single-line/json"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.001.query.sqlpp
new file mode 100644
index 0000000..a178663
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.001.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE COUNT(*)
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.999.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.999.ddl.sqlpp
new file mode 100644
index 0000000..20dc6fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/invalid-parquet-files/test.999.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 12a8ae2..7242984 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -172,6 +172,14 @@
<expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/parquet/invalid-parquet-files">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">none</output-dir>
+ <source-location>false</source-location>
+ <expected-error>20-records.json. Reason: not a Parquet file</expected-error>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="external-dataset" check-warnings="true">
<compilation-unit name="common/parquet/parquet-types/unset-flags">
<placeholder name="adapter" value="S3" />
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 068c125..fc234ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -85,6 +85,8 @@
PARQUET_DECIMAL_TO_DOUBLE_PRECISION_LOSS(55),
PARQUET_TIME_ZONE_ID_IS_NOT_SET(56),
PARQUET_CONTAINS_OVERFLOWED_BIGINT(57),
+ UNEXPECTED_ERROR_ENCOUNTERED(58),
+ INVALID_PARQUET_FILE(59),
UNSUPPORTED_JRE(100),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index f5ef79d..b98dc43 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -92,6 +92,8 @@
55 = Parquet decimal precision loss: precision '%1$s' is greater than the maximum supported precision '%2$s'
56 = Parquet file(s) contain values of the temporal type '%1$s' that are adjusted to UTC. Recreate the external dataset and set the option '%2$s' to get the local-adjusted '%1$s' value
57 = Parquet file(s) contain unsigned integer that is larger than the '%1$s' range
+58 = Error encountered: %1$s
+59 = Invalid Parquet file: %1$s. Reason: %2$s
100 = Unsupported JRE: %1$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 328e09d..d3ad968 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -25,14 +25,18 @@
import java.io.IOException;
import java.util.List;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.parquet.hadoop.Footer;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetInputSplit;
@@ -103,8 +107,8 @@
} else if (oldSplit instanceof FileSplit) {
realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
} else {
- throw new IllegalArgumentException(
- "Invalid split (not a FileSplit or ParquetInputSplitWrapper): " + oldSplit);
+ throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+ LogRedactionUtil.userData(oldSplit.toString()), "invalid file split");
}
valueContainer = new VoidPointable();
firstRecord = false;
@@ -119,6 +123,26 @@
}
} catch (InterruptedException e) {
throw new IOException(e);
+ } catch (HyracksDataException | AsterixParquetRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ if (e.getMessage() != null && e.getMessage().contains("not a Parquet file")) {
+ throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+ LogRedactionUtil.userData(getPath(oldSplit)), "not a Parquet file");
+ }
+
+ throw RuntimeDataException.create(ErrorCode.UNEXPECTED_ERROR_ENCOUNTERED,
+ LogRedactionUtil.userData(e.toString()));
+ }
+ }
+
+ private String getPath(InputSplit split) {
+ if (split instanceof FileSplit) {
+ return ((FileSplit) split).getPath().toString();
+ } else if (split instanceof ParquetInputSplitWrapper) {
+ return ((ParquetInputSplitWrapper) split).realSplit.getPath().toString();
+ } else {
+ return split.toString();
}
}
@@ -215,5 +239,10 @@
public void write(DataOutput out) throws IOException {
realSplit.write(out);
}
+
+ @Override
+ public String toString() {
+ return realSplit.toString();
+ }
}
}