[ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
parquet-java SDK doesn't support empty spaces,"=" in their MessageTypeParser.parseMessageType(). Serialised Schema can't be passed onto the ParquetExternalFilePrinterFactory due to this. So the schema is built twice: first time to catch errors during compilation, second time to build the schema.
Ext-ref: MB-65167
Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: <preethampoluparthi@gmail.com>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index a39bc06..6813e84 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -607,6 +607,7 @@
private final boolean autogenerated;
private final ARecordType itemType;
+ private final ARecordType parquetSchema;
public CompiledCopyToStatement(CopyToStatement copyToStatement) {
this.query = copyToStatement.getQuery();
@@ -623,6 +624,7 @@
this.keyExpressions = copyToStatement.getKeyExpressions();
this.autogenerated = copyToStatement.isAutogenerated();
this.itemType = eddDecl.getItemType();
+ this.parquetSchema = eddDecl.getParquetSchema();
}
@Override
@@ -650,6 +652,10 @@
return itemType;
}
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
public List<Expression> getPathExpressions() {
return pathExpressions;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 978997c..f8d0ff6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -467,7 +467,7 @@
// Write adapter configuration
WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties(),
- copyTo.getItemType(), expr.getSourceLocation());
+ copyTo.getItemType(), copyTo.getParquetSchema(), expr.getSourceLocation());
// writeOperator
WriteOperator writeOperator = new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 334c074..e1b4bb0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4212,8 +4212,8 @@
DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
- edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
- SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
+ edd.setParquetSchema((ARecordType) iaType);
+ SchemaConverterVisitor.convertToParquetSchema((ARecordType) iaType);
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp
new file mode 100644
index 0000000..8f9bb53
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+
+CREATE TYPE ColumnType1 AS {
+ id: integer
+ };
+
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp
new file mode 100644
index 0000000..9022632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : create a dataset using year-month-duration as the primary key
+ * Expected Res : Success
+ * Date : 7 May 2013
+ * Issue : 363
+ */
+
+use test;
+/*
+insert into TestCollection({"id":`year-month-duration`("P16Y"), "name": "John"});
+insert into TestCollection({"id":`day-time-duration`("-P3829H849.392S"), "name": "Alex"});
+*/
+
+insert into TestCollection({"id":18, "Director=name": "SS Rajamouli", "Director.Age" : 51 ,"Films Made" : ["RRR", "Eega", "Baahubali"] });
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp
new file mode 100644
index 0000000..3da960b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "parquet-field-names1")
+TYPE ( { id:int, `Director=name` : string, `Director.Age` : int ,`Films Made` : [string] } )
+WITH {
+ %template_colons%,
+ %additionalProperties%
+ "format":"parquet"
+};
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp
new file mode 100644
index 0000000..38acb88
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+CREATE TYPE ColumnType2 AS {
+};
+
+
+
+CREATE EXTERNAL DATASET TestDataset1(ColumnType2) USING %adapter%
+(
+ %template%,
+ %additional_Properties%,
+ ("definition"="%path_prefix%copy-to-result/parquet-field-names1/"),
+ ("include"="*.parquet"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp
new file mode 100644
index 0000000..86d344c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT c.*
+FROM TestDataset1 c
+ORDER BY c.id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp
new file mode 100644
index 0000000..f72a1f5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+select c.* from TestCollection c
+ ) toWriter
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "parquet-field-names2")
+WITH {
+ %template_colons%,
+ %additionalProperties%
+ "format":"parquet"
+ };
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp
new file mode 100644
index 0000000..17003c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+CREATE EXTERNAL DATASET TestDataset2(ColumnType2) USING %adapter%
+(
+ %template%,
+ %additional_Properties%,
+ ("definition"="%path_prefix%copy-to-result/parquet-field-names2/"),
+ ("include"="*.parquet"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp
new file mode 100644
index 0000000..ce09a4a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT c.*
+FROM TestDataset2 c
+ORDER BY c.id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm
new file mode 100644
index 0000000..4566d53
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm
@@ -0,0 +1 @@
+{ "id": 18, "Director=name": "SS Rajamouli", "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm
new file mode 100644
index 0000000..fa88a74
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm
@@ -0,0 +1 @@
+{ "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ], "Director=name": "SS Rajamouli", "id": 18 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 1a05334..04e8d1e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -115,6 +115,16 @@
</compilation-unit>
</test-case>
<test-case FilePath="copy-to">
+ <compilation-unit name="parquet-field-names">
+ <placeholder name="adapter" value="S3" />
+ <placeholder name="pathprefix" value="" />
+ <placeholder name="path_prefix" value="" />
+ <placeholder name="additionalProperties" value='"container":"playground",' />
+ <placeholder name="additional_Properties" value='("container"="playground")' />
+ <output-dir compare="Text">parquet-field-names</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to">
<compilation-unit name="parquet-empty-array">
<placeholder name="adapter" value="S3" />
<placeholder name="pathprefix" value="" />
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
index d754068..5d9ab7f 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
@@ -53,7 +53,7 @@
public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
MessageType schema = generateSchema(schemaNode);
- printerFactory.setParquetSchemaString(schema.toString());
+ printerFactory.setParquetSchema(schema);
IExternalFileWriter writer = writerFactory.createWriter(ctx, printerFactory);
return new ExternalFileWriter(resolver, writer, maxResult);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
index b500cbe..25fbdc8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+// Maintains a pool of Parquet writers holding a file, each with its own schema , and writes values to the appropriate writer based on schema.
public class ParquetSchemaInferPoolWriter {
private final ParquetExternalWriterFactory writerFactory;
@@ -57,6 +58,7 @@
if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) {
return;
} else if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.GROWING)) {
+ // If the schema is growing, close the existing writer and create a new one with the new schema.
schemaNodes.set(i, schemaLazyVisitor.inferSchema(value));
closeWriter(i);
return;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
index 3dbd4d3..7c1c03b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
@@ -76,6 +76,7 @@
}
+ // Schema Inference is done frame wise, i.e., we infer the schema for all the records in frame and write the values with schema inferred until now.
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tupleAccessor.reset(buffer);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index ba7a1ee..046c03f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -34,23 +34,22 @@
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
public class ParquetExternalFilePrinter implements IExternalPrinter {
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
- private MessageType schema;
+ private final MessageType schema;
private ParquetOutputFile parquetOutputFile;
- private String parquetSchemaString;
+ // private String parquetSchemaString;
private ParquetWriter<IValueReference> writer;
private final long rowGroupSize;
private final int pageSize;
private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, String parquetSchemaString,
+ public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, MessageType parquetSchemaString,
IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.parquetSchemaString = parquetSchemaString;
+ this.schema = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -59,7 +58,6 @@
@Override
public void open() throws HyracksDataException {
- schema = MessageTypeParser.parseMessageType(parquetSchemaString);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index b6ad34e..035e49a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -18,25 +18,34 @@
*/
package org.apache.asterix.external.writer.printer;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactory {
private static final long serialVersionUID = 8971234908711235L;
- private String parquetSchemaString;
+ // parquetInferSchema is for the case when the schema is inferred from the data, not provided by the user
+ // set During the runtime
+ private transient MessageType parquetInferSchema;
+ // parquetProvidedSchema is for the case when the schema is provided by the user
+ private ARecordType parquetProvidedSchema;
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private final long rowGroupSize;
private final int pageSize;
private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, String parquetSchemaString,
- IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
+ public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName,
+ ARecordType parquetprovidedSchema, IAType typeInfo, long rowGroupSize, int pageSize,
+ ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.parquetSchemaString = parquetSchemaString;
+ this.parquetProvidedSchema = parquetprovidedSchema;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -52,13 +61,25 @@
this.writerVersion = writerVersion;
}
- public void setParquetSchemaString(String parquetSchemaString) {
- this.parquetSchemaString = parquetSchemaString;
+ public void setParquetSchema(MessageType parquetInferSchema) {
+ this.parquetInferSchema = parquetInferSchema;
}
@Override
public IExternalPrinter createPrinter() {
- return new ParquetExternalFilePrinter(compressionCodecName, parquetSchemaString, typeInfo, rowGroupSize,
- pageSize, writerVersion);
+ if (parquetInferSchema != null) {
+ return new ParquetExternalFilePrinter(compressionCodecName, parquetInferSchema, typeInfo, rowGroupSize,
+ pageSize, writerVersion);
+ }
+
+ MessageType schema;
+ try {
+ schema = SchemaConverterVisitor.convertToParquetSchema(parquetProvidedSchema);
+ } catch (CompilationException e) {
+ // This should not happen, Compilation Exception should be caught at the query-compile time
+ throw new RuntimeException(e);
+ }
+ return new ParquetExternalFilePrinter(compressionCodecName, schema, typeInfo, rowGroupSize, pageSize,
+ writerVersion);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
index 7058bf6..cdf24c6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
@@ -26,6 +26,8 @@
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.util.string.UTF8StringUtil;
+// The Field Names Dictionary will cache the mapping between field name bytes and their corresponding string representations,
+// minimizing the creation of new string objects during field name deserialization while writing to parquet files.
public class FieldNamesDictionary {
private final FieldNamesTrieDictionary trie;
private final List<String> fieldNames;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
index 99b9736..dfa6e4f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
@@ -22,6 +22,10 @@
import org.apache.hyracks.data.std.api.IValueReference;
public interface ISchemaChecker {
+
+ // EQUIVALENT: Example: { name: string, age: int } -> { name: string, age: int }
+ // GROWING: equivalent types but having extra fields, Example: { name: string, age: int } -> { name: string, age: int , address: string }
+ // CONFLICTING: conflict in types, Example: { name: string, age: int } -> { name: {first:string, last:string}, age: int }
enum SchemaComparisonType {
EQUIVALENT,
GROWING,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
index 373bfe4..2a03bfd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
@@ -44,6 +44,8 @@
private final MessageType schema;
private final RecordLazyVisitablePointable rec;
+ // The Record Consumer is responsible for traversing the record tree,
+ // using recordConsumer.startField() to navigate into a child node and endField() to move back to the parent node.
private RecordConsumer recordConsumer;
private final FieldNamesDictionary fieldNamesDictionary;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
index b591175..70872bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
@@ -39,6 +39,7 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
+// This class is used to infer the schema of a record into SchemaNode, which is an internal tree representation of the schema.
public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> {
private final RecordLazyVisitablePointable rec;
private final FieldNamesDictionary fieldNamesDictionary;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
index 04e11f7..38d12f9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
@@ -44,6 +44,7 @@
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.PrimitiveType;
+//This class reduces the number of Java objects created each time a column is written to a Parquet file by reusing the same VoidPointable for all columns within the file.
public class ParquetValueWriter {
public static final String LIST_FIELD = "list";
public static final String ELEMENT_FIELD = "element";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
index 44cd5b2..fc43c89 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
@@ -30,6 +30,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+// This class is used to check the schema of a record against a schema that has been inferred so far.
+// By checking, we can determine if the record is equivalent to the schema, if the record is growing, or if there is a conflict.
public class SchemaCheckerLazyVisitor implements ISchemaChecker,
ILazyVisitablePointableVisitor<ISchemaChecker.SchemaComparisonType, ParquetSchemaTree.SchemaNode> {
private final FieldNamesDictionary fieldNamesDictionary;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
index a6ea115..9f5d02f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -36,6 +36,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
+// Traverses the RecordType tree and converts it to a Parquet schema.
public class SchemaConverterVisitor implements IATypeVisitor<Void, Pair<Types.Builder, String>> {
public static String MESSAGE_NAME = "asterix_schema";
private final ARecordType schemaType;
@@ -46,9 +47,9 @@
this.unsupportedType = null;
}
- public static String convertToParquetSchemaString(ARecordType schemaType) throws CompilationException {
+ public static MessageType convertToParquetSchema(ARecordType schemaType) throws CompilationException {
SchemaConverterVisitor schemaConverterVisitor = new SchemaConverterVisitor(schemaType);
- return schemaConverterVisitor.getParquetSchema().toString();
+ return schemaConverterVisitor.getParquetSchema();
}
private MessageType getParquetSchema() throws CompilationException {
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
index e1c978a..126f3ba 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
@@ -26,6 +26,7 @@
private Map<String, String> properties;
private String adapter;
private ARecordType itemType;
+ private ARecordType parquetSchema;
public void setAdapter(String adapter) {
this.adapter = adapter;
@@ -43,6 +44,14 @@
return itemType;
}
+ public void setParquetSchema(ARecordType parquetSchema) {
+ this.parquetSchema = parquetSchema;
+ }
+
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
public String getAdapter() {
return adapter;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
index 64f8d6d..1168ba1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
@@ -26,5 +26,7 @@
public interface IExternalWriteDataSink extends IWriteDataSink {
ARecordType getItemType();
+ ARecordType getParquetSchema();
+
SourceLocation getSourceLoc();
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
index d1667bf..4a10f7f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -28,14 +28,16 @@
public class WriteDataSink implements IExternalWriteDataSink {
private final String adapterName;
private final Map<String, String> configuration;
- private ARecordType itemType;
- private SourceLocation sourceLoc;
+ private final ARecordType itemType;
+ private final ARecordType parquetSchema;
+ private final SourceLocation sourceLoc;
public WriteDataSink(String adapterName, Map<String, String> configuration, ARecordType itemType,
- SourceLocation sourceLoc) {
+ ARecordType parquetSchema, SourceLocation sourceLoc) {
this.adapterName = adapterName;
this.configuration = configuration;
this.itemType = itemType;
+ this.parquetSchema = parquetSchema;
this.sourceLoc = sourceLoc;
}
@@ -43,6 +45,7 @@
this.adapterName = writeDataSink.getAdapterName();
this.configuration = new HashMap<>(writeDataSink.configuration);
this.itemType = writeDataSink.itemType;
+ this.parquetSchema = writeDataSink.parquetSchema;
this.sourceLoc = writeDataSink.sourceLoc;
}
@@ -52,6 +55,11 @@
}
@Override
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
+ @Override
public SourceLocation getSourceLoc() {
return sourceLoc;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index e6716df..e88b1de 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -205,7 +205,7 @@
case ExternalDataConstants.FORMAT_PARQUET:
CompressionCodecName compressionCodecName;
- if (compression == null || compression.equals("") || compression.equals("none")) {
+ if (compression == null || compression.isEmpty() || compression.equals("none")) {
compressionCodecName = CompressionCodecName.UNCOMPRESSED;
} else {
compressionCodecName = CompressionCodecName.valueOf(compression.toUpperCase());
@@ -218,10 +218,11 @@
int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
ParquetProperties.WriterVersion writerVersion = getParquetWriterVersion(configuration);
- if (configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) {
- String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
+ ARecordType parquetSchema = ((IExternalWriteDataSink) sink).getParquetSchema();
+
+ if (parquetSchema != null) {
ParquetExternalFilePrinterFactory parquetPrinterFactory =
- new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+ new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchema,
(IAType) sourceType, rowGroupSize, pageSize, writerVersion);
ExternalFileWriterFactory parquetWriterFactory = new ExternalFileWriterFactory(fileWriterFactory,
@@ -230,6 +231,7 @@
partitionComparatorFactories, inputDesc, parquetWriterFactory);
}
+ // Parquet Writing with Schema Inference
int maxSchemas = ExternalWriterProvider.getMaxParquetSchema(configuration);
ParquetExternalFilePrinterFactoryProvider printerFactoryProvider =
new ParquetExternalFilePrinterFactoryProvider(compressionCodecName, (IAType) sourceType,