[ASTERIXDB-3392] Change schema format to asterix
Change-Id: I631b396da1e1c14deea7e2574a395ef5530b1564
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18111
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: <preethampoluparthi@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 26020e5..f30c800 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -109,6 +109,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.WriterValidationUtil;
+import org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
import org.apache.asterix.lang.common.base.IQueryRewriter;
import org.apache.asterix.lang.common.base.IReturningStatement;
import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -4093,6 +4094,21 @@
ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
metadataProvider));
+ if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
+ .equalsIgnoreCase(ExternalDataConstants.FORMAT_PARQUET)) {
+ if (copyTo.getType() == null) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR,
+ "TYPE() Expression is required for parquet format");
+ }
+
+ DataverseName dataverseName =
+ DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
+ IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+ ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
+ edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
+ SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
+ }
+
Map<VarIdentifier, IAObject> externalVars = createExternalVariables(copyTo, stmtParams);
// Query Rewriting (happens under the same ongoing metadata transaction)
LangRewritingContext langRewritingContext = createLangRewritingContext(metadataProvider,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
index 23d9316..b6ec758 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
@@ -30,8 +30,7 @@
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema{"
+ "format":"parquet"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
index 2e811f2..5fb0a9b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
@@ -27,22 +27,23 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks3")
+TYPE ( {
+ id : string,
+ name : string,
+ nested :
+ {
+ first : string,
+ second : string
+ }
+ }
+)
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional binary id (UTF8);
- optional binary name (UTF8);
- optional group nested {
- optional binary first (UTF8);
- optional binary second (UTF8);
- }
- }
-"
+ "format":"parquet"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
index 9933fe2..8b1efc8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
@@ -26,21 +26,21 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks4")
+TYPE ( {
+ id : bigint,
+ name : string,
+ nested:
+ {
+ first : string
+ }
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional int64 id;
- optional binary name (UTF8);
- optional group nested {
- optional binary first (UTF8);
- }
- }
-"
+ "format":"parquet"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
index cf3a450..79b2d1a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
@@ -25,18 +25,18 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks5")
+TYPE ( {
+ id : bigint,
+ name : string,
+ nested : string
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional int64 id;
- optional binary name (UTF8);
- optional binary nested (UTF8);
- }"
+ "format":"parquet"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
index 2d53754..3e6ac48 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
@@ -25,23 +25,23 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks6")
+TYPE ( {
+ id : bigint,
+ name : {
+ first : string
+ },
+ nested:{
+ first : string,
+ second : string
+ }
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message spark_schema {
- optional int64 id;
- optional group name {
- optional binary first (UTF8);
- }
- optional group nested {
- optional binary first (UTF8);
- optional binary second (UTF8);
- }
- }"
+ "format":"parquet"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
index dcb0dcd..851559a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
@@ -25,6 +25,7 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks7")
+TYPE ( {id:int} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,7 +33,6 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema{}",
"row-group-size":"random"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
index 9856c4a..2f356fb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
@@ -25,6 +25,7 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks8")
+TYPE ( {id:int} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,6 +33,5 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema{}",
"page-size":"random"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
index b8579a5..f2293e3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
@@ -25,6 +25,7 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks9")
+TYPE ( { name:string } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,6 +33,5 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "compression":"rar",
- "schema":""
-}
+ "compression":"rar"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.10.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.10.update.sqlpp
new file mode 100644
index 0000000..4f52164
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.10.update.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO S3
+PATH ("copy-to-result", "parquet-error-checks10")
+TYPE ( { name: } )
+WITH {
+ "accessKeyId":"dummyAccessKey",
+ "secretAccessKey":"dummySecretKey",
+ "region":"us-west-2",
+ "serviceEndpoint":"http://127.0.0.1:8001",
+ "container":"playground",
+ "format":"parquet",
+ "schema":"message schema{"
+}
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.11.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.11.update.sqlpp
new file mode 100644
index 0000000..f22071a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.11.update.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO S3
+PATH ("copy-to-result", "parquet-error-checks11")
+TYPE ( {
+ id : int , name : binary
+ } )
+WITH {
+ "accessKeyId":"dummyAccessKey",
+ "secretAccessKey":"dummySecretKey",
+ "region":"us-west-2",
+ "serviceEndpoint":"http://127.0.0.1:8001",
+ "container":"playground",
+ "format":"parquet"
+}
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.12.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.12.update.sqlpp
new file mode 100644
index 0000000..d5d11eb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.12.update.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO S3
+PATH ("copy-to-result", "parquet-error-checks12")
+TYPE ( {
+ id : int,
+ name : string
+ } )
+WITH {
+ "accessKeyId":"dummyAccessKey",
+ "secretAccessKey":"dummySecretKey",
+ "region":"us-west-2",
+ "serviceEndpoint":"http://127.0.0.1:8001",
+ "container":"playground",
+ "format":"parquet",
+ "version" : 3
+}
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
new file mode 100644
index 0000000..75245f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO S3
+PATH ("copy-to-result", "parquet-error-checks13")
+TYPE ( {
+ id : int,
+ name : string,
+ list : [int,string]
+ } )
+WITH {
+ "accessKeyId":"dummyAccessKey",
+ "secretAccessKey":"dummySecretKey",
+ "region":"us-west-2",
+ "serviceEndpoint":"http://127.0.0.1:8001",
+ "container":"playground",
+ "format":"parquet"
+}
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
new file mode 100644
index 0000000..0becb36
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO S3
+PATH ("copy-to-result", "parquet-error-checks14")
+TYPE ( {
+ id : int,
+ name : string,
+ list : [int |
+ } )
+WITH {
+ "accessKeyId":"dummyAccessKey",
+ "secretAccessKey":"dummySecretKey",
+ "region":"us-west-2",
+ "serviceEndpoint":"http://127.0.0.1:8001",
+ "container":"playground",
+ "format":"parquet"
+}
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
new file mode 100644
index 0000000..cc67f79
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO S3
+PATH ("copy-to-result", "parquet-error-checks15")
+TYPE ( {
+ id : int,
+ name : string,
+ list : [int] )
+
+WITH {
+ "accessKeyId":"dummyAccessKey",
+ "secretAccessKey":"dummySecretKey",
+ "region":"us-west-2",
+ "serviceEndpoint":"http://127.0.0.1:8001",
+ "container":"playground",
+ "format":"parquet"
+}
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
index c2606b7..0012e22 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
@@ -24,22 +24,14 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-cover-data-types")
+TYPE ( { name : string, id : int, dateType : date, timeType : time, boolType : boolean, doubleType : double, datetimeType : datetime } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional binary name (UTF8);
- optional int32 id;
- optional int32 dateType (DATE);
- optional int32 timeType (TIME_MILLIS);
- optional boolean boolType ;
- optional double doubleType ;
- optional int64 datetimeType (TIMESTAMP_MILLIS);
- }"
+ "format":"parquet"
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
index 745ac87..4bec537 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
@@ -25,6 +25,7 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-simple")
+TYPE ( {id:string} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,8 +33,5 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema {
- optional binary id (UTF8);
- }"
-
+ "version" : "2"
};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
index 179d25c..aed4e09 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
@@ -24,216 +24,194 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-tweet")
+TYPE ( {
+ coordinates: {
+ coordinates: [
+ double
+ ],
+ `type` : string
+ },
+ created_at: string,
+ entities: {
+ urls: [
+ {
+ display_url: string,
+ expanded_url: string,
+ indices: [
+ int
+ ],
+ url: string
+ }
+ ],
+ user_mentions: [
+ {
+ id: int,
+ id_str: string,
+ indices: [
+ int
+ ],
+ name: string,
+ screen_name: string
+ }
+ ]
+ },
+ favorite_count: int,
+ favorited: boolean,
+ filter_level: string,
+ geo: {
+ coordinates: [
+ double
+ ],
+ `type`: string
+ },
+ id: string,
+ id_str: string,
+ in_reply_to_screen_name: string,
+ in_reply_to_status_id: int,
+ in_reply_to_status_id_str: string,
+ in_reply_to_user_id: int,
+ in_reply_to_user_id_str: string,
+ is_quote_status: boolean,
+ lang: string,
+ place: {
+ bounding_box: {
+ coordinates: [
+ [
+ [
+ double
+ ]
+ ]
+ ],
+ `type`: string
+ },
+ country: string,
+ country_code: string,
+ full_name: string,
+ id: string,
+ name: string,
+ place_type: string,
+ url: string
+ },
+ possibly_sensitive: boolean,
+ quoted_status: {
+ created_at: string,
+ entities: {
+ user_mentions: [
+ {
+ id: int,
+ id_str: string,
+ indices: [
+ int
+ ],
+ name: string,
+ screen_name: string
+ }
+ ]
+ },
+ favorite_count: int,
+ favorited: boolean,
+ filter_level: string,
+ id: int,
+ id_str: string,
+ in_reply_to_screen_name: string,
+ in_reply_to_status_id: int,
+ in_reply_to_status_id_str: string,
+ in_reply_to_user_id: int,
+ in_reply_to_user_id_str: string,
+ is_quote_status: boolean,
+ lang: string,
+ retweet_count: int,
+ retweeted: boolean,
+ source: string,
+ text: string,
+ truncated: boolean,
+ user: {
+ contributors_enabled: boolean,
+ created_at: string,
+ default_profile: boolean,
+ default_profile_image: boolean,
+ description: string,
+ favourites_count: int,
+ followers_count: int,
+ friends_count: int,
+ geo_enabled: boolean,
+ id: int,
+ id_str: string,
+ is_translator: boolean,
+ lang: string,
+ listed_count: int,
+ name: string,
+ profile_background_color: string,
+ profile_background_image_url: string,
+ profile_background_image_url_https: string,
+ profile_background_tile: boolean,
+ profile_banner_url: string,
+ profile_image_url: string,
+ profile_image_url_https: string,
+ profile_link_color: string,
+ profile_sidebar_border_color: string,
+ profile_sidebar_fill_color: string,
+ profile_text_color: string,
+ profile_use_background_image: boolean,
+ protected: boolean,
+ screen_name: string,
+ statuses_count: int,
+ verified: boolean
+ }
+ },
+ quoted_status_id: int,
+ quoted_status_id_str: string,
+ retweet_count: int,
+ retweeted: boolean,
+ source: string,
+ text: string,
+ timestamp_ms: string,
+ truncated: boolean,
+ user: {
+ contributors_enabled: boolean,
+ created_at: string,
+ default_profile: boolean,
+ default_profile_image: boolean,
+ description: string,
+ favourites_count: int,
+ followers_count: int,
+ friends_count: int,
+ geo_enabled: boolean,
+ id: int,
+ id_str: string,
+ is_translator: boolean,
+ lang: string,
+ listed_count: int,
+ location: string,
+ name: string,
+ profile_background_color: string,
+ profile_background_image_url: string,
+ profile_background_image_url_https: string,
+ profile_background_tile: boolean,
+ profile_banner_url: string,
+ profile_image_url: string,
+ profile_image_url_https: string,
+ profile_link_color: string,
+ profile_sidebar_border_color: string,
+ profile_sidebar_fill_color: string,
+ profile_text_color: string,
+ profile_use_background_image: boolean,
+ protected: boolean,
+ screen_name: string,
+ statuses_count: int,
+ time_zone: string,
+ url: string,
+ utc_offset: int,
+ verified: boolean
+ }
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional group coordinates {
- optional group coordinates (LIST) {
- repeated group list {
- optional double element;
- }
- }
- optional binary type (UTF8);
- }
- optional binary created_at (UTF8);
- optional group entities {
- optional group urls (LIST) {
- repeated group list {
- optional group element {
- optional binary display_url (UTF8);
- optional binary expanded_url (UTF8);
- optional group indices (LIST) {
- repeated group list {
- optional int64 element;
- }
- }
- optional binary url (UTF8);
- }
- }
- }
- optional group user_mentions (LIST) {
- repeated group list {
- optional group element {
- optional int64 id;
- optional binary id_str (UTF8);
- optional group indices (LIST) {
- repeated group list {
- optional int64 element;
- }
- }
- optional binary name (UTF8);
- optional binary screen_name (UTF8);
- }
- }
- }
- }
- optional int64 favorite_count;
- optional boolean favorited;
- optional binary filter_level (UTF8);
- optional group geo {
- optional group coordinates (LIST) {
- repeated group list {
- optional double element;
- }
- }
- optional binary type (UTF8);
- }
- optional binary id (UTF8);
- optional binary id_str (UTF8);
- optional binary in_reply_to_screen_name (UTF8);
- optional int64 in_reply_to_status_id;
- optional binary in_reply_to_status_id_str (UTF8);
- optional int64 in_reply_to_user_id;
- optional binary in_reply_to_user_id_str (UTF8);
- optional boolean is_quote_status;
- optional binary lang (UTF8);
- optional group place {
- optional group bounding_box {
- optional group coordinates (LIST) {
- repeated group list {
- optional group element (LIST) {
- repeated group list {
- optional group element (LIST) {
- repeated group list {
- optional double element;
- }
- }
- }
- }
- }
- }
- optional binary type (UTF8);
- }
- optional binary country (UTF8);
- optional binary country_code (UTF8);
- optional binary full_name (UTF8);
- optional binary id (UTF8);
- optional binary name (UTF8);
- optional binary place_type (UTF8);
- optional binary url (UTF8);
- }
- optional boolean possibly_sensitive;
- optional group quoted_status {
- optional binary created_at (UTF8);
- optional group entities {
- optional group user_mentions (LIST) {
- repeated group list {
- optional group element {
- optional int64 id;
- optional binary id_str (UTF8);
- optional group indices (LIST) {
- repeated group list {
- optional int64 element;
- }
- }
- optional binary name (UTF8);
- optional binary screen_name (UTF8);
- }
- }
- }
- }
- optional int64 favorite_count;
- optional boolean favorited;
- optional binary filter_level (UTF8);
- optional int64 id;
- optional binary id_str (UTF8);
- optional binary in_reply_to_screen_name (UTF8);
- optional int64 in_reply_to_status_id;
- optional binary in_reply_to_status_id_str (UTF8);
- optional int64 in_reply_to_user_id;
- optional binary in_reply_to_user_id_str (UTF8);
- optional boolean is_quote_status;
- optional binary lang (UTF8);
- optional int64 retweet_count;
- optional boolean retweeted;
- optional binary source (UTF8);
- optional binary text (UTF8);
- optional boolean truncated;
- optional group user {
- optional boolean contributors_enabled;
- optional binary created_at (UTF8);
- optional boolean default_profile;
- optional boolean default_profile_image;
- optional binary description (UTF8);
- optional int64 favourites_count;
- optional int64 followers_count;
- optional int64 friends_count;
- optional boolean geo_enabled;
- optional int64 id;
- optional binary id_str (UTF8);
- optional boolean is_translator;
- optional binary lang (UTF8);
- optional int64 listed_count;
- optional binary name (UTF8);
- optional binary profile_background_color (UTF8);
- optional binary profile_background_image_url (UTF8);
- optional binary profile_background_image_url_https (UTF8);
- optional boolean profile_background_tile;
- optional binary profile_banner_url (UTF8);
- optional binary profile_image_url (UTF8);
- optional binary profile_image_url_https (UTF8);
- optional binary profile_link_color (UTF8);
- optional binary profile_sidebar_border_color (UTF8);
- optional binary profile_sidebar_fill_color (UTF8);
- optional binary profile_text_color (UTF8);
- optional boolean profile_use_background_image;
- optional boolean protected;
- optional binary screen_name (UTF8);
- optional int64 statuses_count;
- optional boolean verified;
- }
- }
- optional int64 quoted_status_id;
- optional binary quoted_status_id_str (UTF8);
- optional int64 retweet_count;
- optional boolean retweeted;
- optional binary source (UTF8);
- optional binary text (UTF8);
- optional binary timestamp_ms (UTF8);
- optional boolean truncated;
- optional group user {
- optional boolean contributors_enabled;
- optional binary created_at (UTF8);
- optional boolean default_profile;
- optional boolean default_profile_image;
- optional binary description (UTF8);
- optional int64 favourites_count;
- optional int64 followers_count;
- optional int64 friends_count;
- optional boolean geo_enabled;
- optional int64 id;
- optional binary id_str (UTF8);
- optional boolean is_translator;
- optional binary lang (UTF8);
- optional int64 listed_count;
- optional binary location (UTF8);
- optional binary name (UTF8);
- optional binary profile_background_color (UTF8);
- optional binary profile_background_image_url (UTF8);
- optional binary profile_background_image_url_https (UTF8);
- optional boolean profile_background_tile;
- optional binary profile_banner_url (UTF8);
- optional binary profile_image_url (UTF8);
- optional binary profile_image_url_https (UTF8);
- optional binary profile_link_color (UTF8);
- optional binary profile_sidebar_border_color (UTF8);
- optional binary profile_sidebar_fill_color (UTF8);
- optional binary profile_text_color (UTF8);
- optional boolean profile_use_background_image;
- optional boolean protected;
- optional binary screen_name (UTF8);
- optional int64 statuses_count;
- optional binary time_zone (UTF8);
- optional binary url (UTF8);
- optional int64 utc_offset;
- optional boolean verified;
- }
- }"
+ "format":"parquet"
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
index ef76ad1..9a1c9a4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
@@ -24,18 +24,14 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-utf8")
+TYPE ( { comment:string, id:bigint, name:string } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message spark_schema {
- optional binary comment (UTF8);
- optional int64 id;
- optional binary name (UTF8);
- }"
+ "format":"parquet"
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 8e31aa4..1107dda 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -107,15 +107,21 @@
<test-case FilePath="copy-to/negative">
<compilation-unit name="parquet-error-checks">
<output-dir compare="Text">parquet-error-checks</output-dir>
- <expected-error>HYR0131: Invalid parquet schema provided</expected-error>
+ <expected-error>ASX1079: Compilation error: TYPE() Expression is required for parquet format</expected-error>
<expected-error>ASX0037: Type mismatch: expected value of type integer, but got the value of type BINARY</expected-error>
- <expected-error>HYR0133: Extra field in the result, field 'second' does not exist at 'nested' in the schema</expected-error>
- <expected-error>HYR0132: Result does not follow the schema, group type expected but found primitive type at 'nested'</expected-error>
- <expected-error>HYR0132: Result does not follow the schema, primitive type expected but found group type at 'name'</expected-error>
+ <expected-error>HYR0132: Extra field in the result, field 'second' does not exist at 'nested' in the schema</expected-error>
+ <expected-error>HYR0131: Result does not follow the schema, group type expected but found primitive type at 'nested'</expected-error>
+ <expected-error>HYR0131: Result does not follow the schema, primitive type expected but found group type at 'name'</expected-error>
<expected-error>ASX1201: Storage units expected for the field 'row-group-size' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided 'random'</expected-error>
<expected-error>ASX1201: Storage units expected for the field 'page-size' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided 'random'</expected-error>
<expected-error>ASX1202: Unsupported compression scheme rar. Supported schemes for parquet are [gzip, snappy, zstd]</expected-error>
- </compilation-unit>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX1204: 'binary' type not supported in parquet format</expected-error>
+ <expected-error>ASX1205: Invalid Parquet Writer Version provided.Supported values: 1,2</expected-error>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ </compilation-unit>
</test-case>
<test-case FilePath="copy-to/negative">
<compilation-unit name="empty-over">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 5e59b97..832a7cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -306,6 +306,9 @@
PREFIX_SHOULD_NOT_START_WITH_SLASH(1200),
ILLEGAL_SIZE_PROVIDED(1201),
UNSUPPORTED_WRITER_COMPRESSION_SCHEME(1202),
+ INVALID_PARQUET_SCHEMA(1203),
+ TYPE_UNSUPPORTED_PARQUET_WRITE(1204),
+ INVALID_PARQUET_WRITER_VERSION(1205),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index a07dddf..716dcf6 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -307,6 +307,9 @@
1200 = Prefix should not start with "/". Prefix: '%1$s'
1201 = Storage units expected for the field '%1$s' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided '%2$s'
1202 = Unsupported compression scheme %1$s. Supported schemes for %2$s are %3$s
+1203 = Invalid schema provided: '%1$s'
+1204 = '%1$s' type not supported in parquet format
+1205 = Invalid Parquet Writer Version provided.Supported values: 1,2
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 6a4b336..02c2070 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -211,6 +211,13 @@
public static final String FORMAT_CSV = "csv";
public static final String FORMAT_TSV = "tsv";
public static final String FORMAT_PARQUET = "parquet";
+ public static final String PARQUET_SCHEMA_KEY = "parquet-schema";
+ public static final String PARQUET_WRITER_VERSION_KEY = "version";
+ public static final String PARQUET_WRITER_VERSION_VALUE_1 = "1";
+ public static final String PARQUET_WRITER_VERSION_VALUE_2 = "2";
+ public static final String DUMMY_DATABASE_NAME = "dbname";
+ public static final String DUMMY_TYPE_NAME = "typeName";
+ public static final String DUMMY_DATAVERSE_NAME = "a.b.c";
public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
public static final Set<String> ALL_FORMATS;
public static final Set<String> TEXTUAL_FORMATS;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 0d82dd9..5c3585b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -20,11 +20,15 @@
import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
import static org.apache.asterix.common.exceptions.ErrorCode.MINIMUM_VALUE_ALLOWED_FOR_PARAM;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
import static org.apache.asterix.external.util.ExternalDataConstants.FORMAT_JSON_LOWER_CASE;
import static org.apache.asterix.external.util.ExternalDataConstants.FORMAT_PARQUET;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_PAGE_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_ROW_GROUP_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_WRITER_MAX_RESULT;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_VALUE_1;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_VALUE_2;
import static org.apache.asterix.external.util.ExternalDataConstants.WRITER_MAX_RESULT_MINIMUM;
import java.util.List;
@@ -75,6 +79,18 @@
validateParquetCompression(configuration, sourceLocation);
validateParquetRowGroupSize(configuration);
validateParquetPageSize(configuration);
+ validateVersion(configuration);
+ }
+
+ private static void validateVersion(Map<String, String> configuration) throws CompilationException {
+ String version = configuration.get(PARQUET_WRITER_VERSION_KEY);
+ if (version == null) {
+ return;
+ }
+ if (version.equals(PARQUET_WRITER_VERSION_VALUE_1) || version.equals(PARQUET_WRITER_VERSION_VALUE_2)) {
+ return;
+ }
+ throw CompilationException.create(ErrorCode.INVALID_PARQUET_WRITER_VERSION);
}
private static void validateParquetRowGroupSize(Map<String, String> configuration) throws CompilationException {
@@ -150,7 +166,7 @@
}
if (value == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, sourceLocation, paramKey);
+ throw new CompilationException(PARAMETERS_REQUIRED, sourceLocation, paramKey);
}
String normalizedValue = value.toLowerCase();
@@ -185,7 +201,7 @@
}
if (value == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, sourceLocation, paramKey);
+ throw new CompilationException(PARAMETERS_REQUIRED, sourceLocation, paramKey);
}
String normalizedValue = value.toLowerCase();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index ca2ad55..ba7a1ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -19,8 +19,6 @@
package org.apache.asterix.external.writer.printer;
-import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
-
import java.io.IOException;
import java.io.OutputStream;
@@ -30,40 +28,38 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
public class ParquetExternalFilePrinter implements IExternalPrinter {
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
- private String schemaString;
private MessageType schema;
private ParquetOutputFile parquetOutputFile;
+ private String parquetSchemaString;
private ParquetWriter<IValueReference> writer;
private final long rowGroupSize;
private final int pageSize;
+ private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, String schemaString, IAType typeInfo,
- long rowGroupSize, int pageSize) {
+ public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, String parquetSchemaString,
+ IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.schemaString = schemaString.replace('\r', ' ');
+ this.parquetSchemaString = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
+ this.writerVersion = writerVersion;
}
@Override
public void open() throws HyracksDataException {
- try {
- this.schema = parseMessageType(schemaString);
- } catch (IllegalArgumentException e) {
- throw new HyracksDataException(ErrorCode.ILLGEAL_PARQUET_SCHEMA);
- }
+ schema = MessageTypeParser.parseMessageType(parquetSchemaString);
}
@Override
@@ -78,8 +74,8 @@
writer = AsterixParquetWriter.builder(parquetOutputFile).withCompressionCodec(compressionCodecName)
.withType(schema).withTypeInfo(typeInfo).withRowGroupSize(rowGroupSize).withPageSize(pageSize)
.withDictionaryPageSize(ExternalDataConstants.PARQUET_DICTIONARY_PAGE_SIZE)
- .enableDictionaryEncoding().withValidation(false)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withConf(conf).build();
+ .enableDictionaryEncoding().withValidation(false).withWriterVersion(writerVersion).withConf(conf)
+ .build();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index 53975d2..5ccd2fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -21,27 +21,31 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactory {
private static final long serialVersionUID = 8971234908711234L;
- private final String schema;
+ private final String parquetSchemaString;
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private final long rowGroupSize;
private final int pageSize;
+ private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, String schema, IAType typeInfo,
- long rowGroupSize, int pageSize) {
+ public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, String parquetSchemaString,
+ IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.schema = schema;
+ this.parquetSchemaString = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
+ this.writerVersion = writerVersion;
}
@Override
public IExternalPrinter createPrinter() {
- return new ParquetExternalFilePrinter(compressionCodecName, schema, typeInfo, rowGroupSize, pageSize);
+ return new ParquetExternalFilePrinter(compressionCodecName, parquetSchemaString, typeInfo, rowGroupSize,
+ pageSize, writerVersion);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
new file mode 100644
index 0000000..410c951
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import java.util.Map;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class AsterixParquetTypeMap {
+
+ public static final Map<ATypeTag, PrimitiveType.PrimitiveTypeName> PRIMITIVE_TYPE_NAME_MAP =
+ Map.ofEntries(Map.entry(ATypeTag.BOOLEAN, PrimitiveType.PrimitiveTypeName.BOOLEAN),
+ Map.entry(ATypeTag.STRING, PrimitiveType.PrimitiveTypeName.BINARY),
+ Map.entry(ATypeTag.TINYINT, PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.SMALLINT, PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.INTEGER, PrimitiveType.PrimitiveTypeName.INT64),
+ Map.entry(ATypeTag.BIGINT, PrimitiveType.PrimitiveTypeName.INT64),
+ Map.entry(ATypeTag.FLOAT, PrimitiveType.PrimitiveTypeName.FLOAT),
+ Map.entry(ATypeTag.DOUBLE, PrimitiveType.PrimitiveTypeName.DOUBLE),
+ Map.entry(ATypeTag.DATE, PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.TIME, PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.DATETIME, PrimitiveType.PrimitiveTypeName.INT64));
+
+ public static final Map<ATypeTag, LogicalTypeAnnotation> LOGICAL_TYPE_ANNOTATION_MAP =
+ Map.ofEntries(Map.entry(ATypeTag.STRING, LogicalTypeAnnotation.stringType()),
+ Map.entry(ATypeTag.DATE, LogicalTypeAnnotation.dateType()),
+ Map.entry(ATypeTag.TIME,
+ LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)),
+ Map.entry(ATypeTag.DATETIME,
+ LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)));
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
new file mode 100644
index 0000000..b25d1f5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class SchemaConverterVisitor implements IATypeVisitor<Void, Pair<Types.Builder, String>> {
+ public static String MESSAGE_NAME = "asterix_schema";
+ private final ARecordType schemaType;
+ private ATypeTag unsupportedType;
+
+ private SchemaConverterVisitor(ARecordType schemaType) {
+ this.schemaType = schemaType;
+ this.unsupportedType = null;
+ }
+
+ public static String convertToParquetSchemaString(ARecordType schemaType) throws CompilationException {
+ SchemaConverterVisitor schemaConverterVisitor = new SchemaConverterVisitor(schemaType);
+ return schemaConverterVisitor.getParquetSchema().toString();
+ }
+
+ private MessageType getParquetSchema() throws CompilationException {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+
+ for (int i = 0; i < schemaType.getFieldNames().length; i++) {
+ String fieldName = schemaType.getFieldNames()[i];
+ IAType childType = schemaType.getFieldType(fieldName);
+ childType.accept(this, new Pair<>(builder, fieldName));
+ if (unsupportedType != null) {
+ throw new CompilationException(ErrorCode.TYPE_UNSUPPORTED_PARQUET_WRITE, unsupportedType.toString());
+ }
+ }
+ return builder.named(MESSAGE_NAME);
+ }
+
+ @Override
+ public Void visit(ARecordType recordType, Pair<Types.Builder, String> arg) {
+ Types.Builder builder = arg.first;
+ String fieldName = arg.second;
+
+ Types.BaseGroupBuilder childBuilder = getGroupChild(builder);
+ for (int i = 0; i < recordType.getFieldNames().length; i++) {
+ String childFieldName = recordType.getFieldNames()[i];
+ IAType childType = recordType.getFieldType(childFieldName);
+
+ childType.accept(this, new Pair<>(childBuilder, childFieldName));
+
+ }
+ childBuilder.named(fieldName);
+
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractCollectionType collectionType, Pair<Types.Builder, String> arg) {
+ Types.Builder builder = arg.first;
+ String fieldName = arg.second;
+
+ Types.BaseListBuilder childBuilder = getListChild(builder);
+ IAType child = collectionType.getItemType();
+ child.accept(this, new Pair<>(childBuilder, fieldName));
+
+ return null;
+ }
+
+ @Override
+ public Void visit(AUnionType unionType, Pair<Types.Builder, String> arg) {
+ // Shouldn't reach here.
+ return null;
+ }
+
+ @Override
+ public Void visitFlat(IAType flatType, Pair<Types.Builder, String> arg) {
+ Types.Builder builder = arg.first;
+ String fieldName = arg.second;
+
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
+ AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP.get(flatType.getTypeTag());
+
+ if (primitiveTypeName == null) {
+ unsupportedType = flatType.getTypeTag();
+ }
+
+ LogicalTypeAnnotation logicalTypeAnnotation =
+ AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP.get(flatType.getTypeTag());
+
+ getPrimitiveChild(builder, primitiveTypeName, logicalTypeAnnotation).named(fieldName);
+
+ return null;
+ }
+
+ private static Types.BaseGroupBuilder getGroupChild(Types.Builder parent) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optionalGroup();
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>) parent).optionalGroupElement();
+ } else {
+ return null;
+ }
+ }
+
+ private static Types.BaseListBuilder getListChild(Types.Builder parent) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optionalList();
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>) parent).optionalListElement();
+ } else {
+ return null;
+ }
+ }
+
+ private static Types.Builder getPrimitiveChild(Types.Builder parent, PrimitiveType.PrimitiveTypeName type,
+ LogicalTypeAnnotation annotation) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation);
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation);
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
index 599d528..5bc4a71 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
@@ -32,6 +32,8 @@
import org.apache.asterix.lang.common.base.IReturningStatement;
import org.apache.asterix.lang.common.clause.OrderbyClause;
import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.RecordTypeDefinition;
+import org.apache.asterix.lang.common.expression.TypeExpression;
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -52,13 +54,14 @@
private List<Expression> partitionExpressions;
private List<Expression> orderByList;
private int varCounter;
+ private RecordTypeDefinition itemType;
public CopyToStatement(Namespace namespace, String datasetName, Query query, VariableExpr sourceVariable,
ExternalDetailsDecl externalDetailsDecl, int varCounter, List<Expression> keyExpressions,
boolean autogenerated) {
this(namespace, datasetName, query, sourceVariable, externalDetailsDecl, new ArrayList<>(), new ArrayList<>(),
new HashMap<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), varCounter, keyExpressions,
- autogenerated);
+ autogenerated, null);
}
public CopyToStatement(Namespace namespace, String datasetName, Query query, VariableExpr sourceVariable,
@@ -68,7 +71,18 @@
List<OrderbyClause.NullOrderModifier> orderByNullModifierList, int varCounter) {
this(namespace, datasetName, query, sourceVariable, externalDetailsDecl, pathExpressions, partitionExpressions,
partitionsVariables, orderbyList, orderByModifiers, orderByNullModifierList, varCounter,
- new ArrayList<>(), false);
+ new ArrayList<>(), false, null);
+ }
+
+ public CopyToStatement(Namespace namespace, String datasetName, Query query, VariableExpr sourceVariable,
+ ExternalDetailsDecl externalDetailsDecl, List<Expression> pathExpressions,
+ List<Expression> partitionExpressions, Map<Integer, VariableExpr> partitionsVariables,
+ List<Expression> orderbyList, List<OrderbyClause.OrderModifier> orderByModifiers,
+ List<OrderbyClause.NullOrderModifier> orderByNullModifierList, int varCounter,
+ RecordTypeDefinition itemType) {
+ this(namespace, datasetName, query, sourceVariable, externalDetailsDecl, pathExpressions, partitionExpressions,
+ partitionsVariables, orderbyList, orderByModifiers, orderByNullModifierList, varCounter,
+ new ArrayList<>(), false, itemType);
}
private CopyToStatement(Namespace namespace, String datasetName, Query query, VariableExpr sourceVariable,
@@ -76,7 +90,7 @@
List<Expression> partitionExpressions, Map<Integer, VariableExpr> partitionsVariables,
List<Expression> orderbyList, List<OrderbyClause.OrderModifier> orderByModifiers,
List<OrderbyClause.NullOrderModifier> orderByNullModifierList, int varCounter,
- List<Expression> keyExpressions, boolean autogenerated) {
+ List<Expression> keyExpressions, boolean autogenerated, RecordTypeDefinition itemType) {
this.namespace = namespace;
this.datasetName = datasetName;
this.query = query;
@@ -91,6 +105,7 @@
this.varCounter = varCounter;
this.keyExpressions = keyExpressions != null ? keyExpressions : new ArrayList<>();
this.autogenerated = autogenerated;
+ this.itemType = itemType;
if (pathExpressions.isEmpty()) {
// Ensure path expressions to have at least an empty string
@@ -117,6 +132,10 @@
this.namespace = namespace;
}
+ public RecordTypeDefinition getType() {
+ return itemType;
+ }
+
public Namespace getNamespace() {
return namespace;
}
@@ -192,6 +211,10 @@
return !orderByList.isEmpty();
}
+ public TypeExpression getItemType() {
+ return itemType;
+ }
+
@Override
public int getVarCounter() {
return varCounter;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 4276a14..1cfa892 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -2936,6 +2936,7 @@
Namespace namespace = nameComponents == null ? null : nameComponents.first;
String datasetName = nameComponents == null ? null : nameComponents.second.getValue();
List<Expression> pathExprs;
+ RecordTypeDefinition typeExpr = null;
List<Expression> partitionExprs = new ArrayList<Expression>();
Map<Integer, VariableExpr> partitionVarExprs = new HashMap<Integer, VariableExpr>();
@@ -2947,6 +2948,7 @@
<TO> adapterName = AdapterName()
<PATH> <LEFTPAREN> pathExprs = ExpressionList() <RIGHTPAREN>
(CopyToOverClause(partitionExprs, partitionVarExprs, orderbyList, orderbyModifierList, orderbyNullModifierList))?
+ (<TYPE> <LEFTPAREN> typeExpr = RecordTypeDef() <RIGHTPAREN>)?
<WITH> withRecord = RecordConstructor()
{
ExternalDetailsDecl edd = new ExternalDetailsDecl();
@@ -2961,8 +2963,7 @@
usedAlias = new VariableExpr(SqlppVariableUtil.toInternalVariableIdentifier(datasetName));
}
- CopyToStatement stmt = new CopyToStatement(namespace, datasetName, query, usedAlias, edd, pathExprs,
- partitionExprs, partitionVarExprs, orderbyList, orderbyModifierList, orderbyNullModifierList, getVarCounter());
+ CopyToStatement stmt = new CopyToStatement(namespace, datasetName, query, usedAlias, edd, pathExprs, partitionExprs, partitionVarExprs, orderbyList, orderbyModifierList, orderbyNullModifierList, getVarCounter(), typeExpr);
return addSourceLocation(stmt, startToken);
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index c931a93..23b9f93 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -25,6 +25,7 @@
import org.apache.asterix.cloud.writer.GCSExternalFileWriterFactory;
import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
@@ -44,6 +45,7 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.util.StorageUtil;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ExternalWriterProvider {
@@ -111,7 +113,7 @@
}
public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink,
- Object sourceType) {
+ Object sourceType) throws CompilationException {
Map<String, String> configuration = sink.getConfiguration();
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -130,11 +132,8 @@
IPrinterFactory printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
case ExternalDataConstants.FORMAT_PARQUET:
+ String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
- if (!configuration.containsKey(ExternalDataConstants.KEY_SCHEMA)) {
- throw new UnsupportedOperationException("Schema not provided for parquet");
- }
- String schema = configuration.get(ExternalDataConstants.KEY_SCHEMA);
CompressionCodecName compressionCodecName;
if (compression == null || compression.equals("") || compression.equals("none")) {
compressionCodecName = CompressionCodecName.UNCOMPRESSED;
@@ -148,13 +147,26 @@
long rowGroupSize = StorageUtil.getByteValue(rowGroupSizeString);
int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
- return new ParquetExternalFilePrinterFactory(compressionCodecName, schema, (IAType) sourceType,
- rowGroupSize, pageSize);
+ ParquetProperties.WriterVersion writerVersion = getParquetWriterVersion(configuration);
+
+ return new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+ (IAType) sourceType, rowGroupSize, pageSize, writerVersion);
default:
throw new UnsupportedOperationException("Unsupported format " + format);
}
}
+ private static ParquetProperties.WriterVersion getParquetWriterVersion(Map<String, String> configuration) {
+
+ if (configuration.get(ExternalDataConstants.PARQUET_WRITER_VERSION_KEY) == null) {
+ return ParquetProperties.WriterVersion.PARQUET_1_0;
+ } else if (configuration.get(ExternalDataConstants.PARQUET_WRITER_VERSION_KEY)
+ .equals(ExternalDataConstants.PARQUET_WRITER_VERSION_VALUE_2)) {
+ return ParquetProperties.WriterVersion.PARQUET_2_0;
+ } else
+ return ParquetProperties.WriterVersion.PARQUET_1_0;
+ }
+
private static String getRowGroupSize(Map<String, String> configuration) {
return configuration.getOrDefault(ExternalDataConstants.KEY_PARQUET_ROW_GROUP_SIZE,
ExternalDataConstants.PARQUET_DEFAULT_ROW_GROUP_SIZE);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 319d440..02327e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -158,9 +158,8 @@
UNSUPPORTED_WRITE_SPEC(128),
JOB_REJECTED(129),
FRAME_BIGGER_THAN_SORT_MEMORY(130),
- ILLGEAL_PARQUET_SCHEMA(131),
- RESULT_DOES_NOT_FOLLOW_SCHEMA(132),
- EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(133),
+ RESULT_DOES_NOT_FOLLOW_SCHEMA(131),
+ EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 51acdc0..226234f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -148,9 +148,8 @@
128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
129 = Job %1$s failed to run. Cluster is not accepting jobs.
130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget.
-131 = Invalid parquet schema provided
-132 = Result does not follow the schema, %1$s type expected but found %2$s type at '%3$s'
-133 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the schema
+131 = Result does not follow the schema, %1$s type expected but found %2$s type at '%3$s'
+132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the schema
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s