[ASTERIXDB-3392] Add schema inference in COPY TO parquet
Ext-ref: MB-61498
Details:
Schema will be optional as part of COPY TO statement while writing in parquet from this patch.If schema is not
provided, it will be inferred. If there is conflict in schema, it will be written in different file.
Change-Id: I67f6ceaa62edd8414dc33c050e6082f11ce0fd3e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18872
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/data/hdfs/parquet/partition_heterogeneous.json b/asterixdb/asterix-app/data/hdfs/parquet/partition_heterogeneous.json
new file mode 100644
index 0000000..e6179d6
--- /dev/null
+++ b/asterixdb/asterix-app/data/hdfs/parquet/partition_heterogeneous.json
@@ -0,0 +1,100 @@
+{"id": 1, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 2, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 3, "partitioner_key": "C", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 4, "partitioner_key": "C", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 5, "partitioner_key": "A", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 6, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 7, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 8, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 9, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 10, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 11, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 12, "partitioner_key": "B", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 13, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 14, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 15, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 16, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 17, "partitioner_key": "B", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 18, "partitioner_key": "C", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 19, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 20, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 21, "partitioner_key": "B", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 22, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 23, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 24, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 25, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 26, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 27, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 28, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 29, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 30, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 31, "partitioner_key": "C", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 32, "partitioner_key": "B", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 33, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 34, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 35, "partitioner_key": "B", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 36, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 37, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 38, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 39, "partitioner_key": "B", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 40, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 41, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 42, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 43, "partitioner_key": "C", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 44, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 45, "partitioner_key": "A", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 46, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 47, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 48, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 49, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 50, "partitioner_key": "C", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 51, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 52, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 53, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 54, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 55, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 56, "partitioner_key": "B", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 57, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 58, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 59, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 60, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 61, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 62, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 63, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 64, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 65, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 66, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 67, "partitioner_key": "B", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 68, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 69, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 70, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 71, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 72, "partitioner_key": "B", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 73, "partitioner_key": "C", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 74, "partitioner_key": "C", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 75, "partitioner_key": "A", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 76, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 77, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 78, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 79, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 80, "partitioner_key": "A", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 81, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 82, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 83, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 84, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 85, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 86, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 87, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 88, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 89, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 90, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 91, "partitioner_key": "A", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 92, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 93, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 94, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 95, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 96, "partitioner_key": "B", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 97, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 98, "partitioner_key": "B", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 99, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 100, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 555b6b9..17cf78a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4118,19 +4118,16 @@
ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
metadataProvider));
- if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
- .equalsIgnoreCase(ExternalDataConstants.FORMAT_PARQUET)) {
- if (copyTo.getType() == null) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR,
- "TYPE() Expression is required for parquet format");
+ if (ExternalDataConstants.FORMAT_PARQUET
+ .equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
+ if (copyTo.getType() != null) {
+ DataverseName dataverseName =
+ DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
+ IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+ ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
+ edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
+ SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
}
-
- DataverseName dataverseName =
- DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
- IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
- ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
- edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
- SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
}
if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp
index 36d00be..0bfc4eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp
@@ -23,7 +23,7 @@
CREATE TYPE ColumnType1 AS {
- id: integer
+ id: int
};
CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
index ce68cbd..8eb14c0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
@@ -20,7 +20,7 @@
USE test;
-insert into TestCollection({"id":1, "name": "John", "nested" : { "first" : "john" , "second":"JOHN" } });
+insert into TestCollection({"id":2, "name": "John", "nested" : { "first" : "john" , "second":"JOHN" } });
COPY (
select c.* from TestCollection c
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
index 281610b..f5d7eda 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
@@ -24,14 +24,15 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks13")
-TYPE ( { id : int, name : string, list : [int,string]} )
+TYPE ( { id : int, name : string, list : [string]} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet"
+ "format":"parquet",
+ "max-schemas" : "yvghc"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
index 1d1c6af..7628044 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
@@ -24,14 +24,15 @@
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks14")
-TYPE ( {id : int, name : string, list : [int |} )
+TYPE ( {id : int, name : string, l : int } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet"
+ "format":"parquet",
+ "max-schemas" : "15"
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
index a26624a..89bfb2a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
@@ -19,12 +19,20 @@
USE test;
+
+
+
+
+
+insert into TestCollection({"id":1, "name": []});
+
+
+
COPY (
- select c.* from TestCollection c
-) toWriter
+select c.* from TestCollection c
+ ) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks15")
-TYPE ( {id : int, name : string, list : [int] )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,7 +40,5 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet"
-}
-
-
+ }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.16.update.sqlpp
similarity index 64%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.16.update.sqlpp
index b6ec758..ad4fc73 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.16.update.sqlpp
@@ -19,19 +19,34 @@
USE test;
+
+
+
+insert into TestCollection({"id":10, "name": 1 });
+insert into TestCollection({"id":12, "name": [1] });
+insert into TestCollection({"id":15, "name": [[1]] });
+insert into TestCollection({"id":16, "name": [[[1]]] });
+insert into TestCollection({"id":17, "name": [[[[1]]]] });
+insert into TestCollection({"id":18, "name": [[[[[1]]]]] });
+insert into TestCollection({"id":19, "name": [[[[[[1]]]]]] });
+
+
+
COPY (
- select c.* from TestCollection c
-) toWriter
+select id,name from TestCollection c
+ ) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks16")
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet"
-}
+ "format":"parquet",
+ "max-schemas" : "2"
+ }
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.01.ddl.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.01.ddl.sqlpp
index b6ec758..00d5c49 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.01.ddl.sqlpp
@@ -17,21 +17,13 @@
* under the License.
*/
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
+CREATE TYPE ColumnType1 AS {
+ id: int
+};
-
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.02.update.sqlpp
new file mode 100644
index 0000000..308d3b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.02.update.sqlpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+
+-- INSERT INTO TestCollection( {"id":2,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":5,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":8,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":10,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":12,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":15,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":17,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":20,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":21,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":27,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":28,"name":{"first":"virat"}});
+
+
+insert into TestCollection({"id":2});
+insert into TestCollection({"id":5,"name":"virat"});
+insert into TestCollection({"id":8,"name":{"first":"virat"}});
+insert into TestCollection({"id":10,"name":{"first":"virat"},"age":18});
+insert into TestCollection({"id":12,"name":123});
+insert into TestCollection({"id":15,"name":[123,456]});
+insert into TestCollection({"id":17,"name":765});
+insert into TestCollection({"id":20,"name":[789]});
+insert into TestCollection({"id":21,"name":[{"first":"virat"}]});
+insert into TestCollection({"id":27,"name":[{"first":"virat","second":"kohli"}]});
+insert into TestCollection({"id":28,"name":{"first":"virat"}});
+
+
+
+-- INSERT INTO TestCollection( {"id":1,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":2,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":3,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":4,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":5,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":6,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":7,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":8,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":9,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":10,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":11,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":12,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":13,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":14,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":15,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":16,"name":{"first":"virat"}});
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.03.update.sqlpp
similarity index 94%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.03.update.sqlpp
index b6ec758..a7e6e16 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.03.update.sqlpp
@@ -23,7 +23,7 @@
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-heterogeneous")
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -31,7 +31,4 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet"
-}
-
-
-
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.04.ddl.sqlpp
similarity index 69%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.04.ddl.sqlpp
index b6ec758..95f75c6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.04.ddl.sqlpp
@@ -19,19 +19,19 @@
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
+
+CREATE TYPE ColumnType2 AS {
+};
+CREATE EXTERNAL DATASET TestDataset(ColumnType2) USING S3
+(
+ ("serviceEndpoint"="http://127.0.0.1:8001"),
+ ("region"="us-west-2"),
+ ("container"="playground"),
+ ("definition"="copy-to-result/parquet-heterogeneous/"),
+ ("include"="*.parquet"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.query.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.query.sqlpp
index b6ec758..3d26c18 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.query.sqlpp
@@ -19,19 +19,7 @@
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
-
-
+SELECT c.*
+FROM TestDataset c
+ORDER BY c.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.01.ddl.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.01.ddl.sqlpp
index b6ec758..2155b4e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.01.ddl.sqlpp
@@ -17,21 +17,19 @@
* under the License.
*/
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
+CREATE TYPE ColumnType1 AS {
+ id: int
+ };
+
+CREATE DATASET ParitionParquetDataset(ColumnType1)
+PRIMARY KEY id WITH {
+ "storage-format": {"format" : "column"}
+};
-
+CREATE TYPE ColumnType2 AS {
+ };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.02.update.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.02.update.sqlpp
index b6ec758..4de223e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.02.update.sqlpp
@@ -19,19 +19,8 @@
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
-
-
-
+LOAD DATASET ParitionParquetDataset USING localfs
+(
+ ("path" = "asterix_nc1://data/hdfs/parquet/partition_heterogeneous.json"),
+ ("format" = "json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.03.update.sqlpp
similarity index 78%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.03.update.sqlpp
index b6ec758..948ba17 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.03.update.sqlpp
@@ -20,18 +20,20 @@
USE test;
COPY (
- select c.* from TestCollection c
-) toWriter
+SELECT t.* FROM ParitionParquetDataset t
+ ) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-partition-heterogeneous" , partitioner_key , "random" )
+OVER ( PARTITION BY toWriter.partitioner_key AS partitioner_key )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet"
-}
+ "format":"parquet",
+ "max-schemas" : "10"
+ };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.04.ddl.sqlpp
similarity index 69%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.04.ddl.sqlpp
index b6ec758..c626270 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.04.ddl.sqlpp
@@ -19,19 +19,16 @@
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
+CREATE EXTERNAL DATASET ParitionParquetDatasetCopy(ColumnType2) USING S3
+(
+ ("serviceEndpoint"="http://127.0.0.1:8001"),
+ ("region"="us-west-2"),
+ ("container"="playground"),
+ ("definition"="copy-to-result/parquet-partition-heterogeneous/"),
+ ("include"="*.parquet"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.query.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.query.sqlpp
index b6ec758..7009f98 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.query.sqlpp
@@ -19,19 +19,9 @@
USE test;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
-}
+SELECT c.*
+FROM ParitionParquetDatasetCopy c
+ORDER BY c.id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.adm
new file mode 100644
index 0000000..4ecacdb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.adm
@@ -0,0 +1,11 @@
+{ "id": 2 }
+{ "name": "virat", "id": 5 }
+{ "name": { "first": "virat" }, "id": 8 }
+{ "name": { "first": "virat" }, "id": 10, "age": 18 }
+{ "name": 123, "id": 12 }
+{ "name": [ 123, 456 ], "id": 15 }
+{ "name": 765, "id": 17 }
+{ "name": [ 789 ], "id": 20 }
+{ "name": [ { "first": "virat" } ], "id": 21 }
+{ "name": [ { "first": "virat", "second": "kohli" } ], "id": 27 }
+{ "name": { "first": "virat" }, "id": 28 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.adm
new file mode 100644
index 0000000..6b73640
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.adm
@@ -0,0 +1,100 @@
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 1, "randomField": 5678 }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 2, "randomField": 18 }
+{ "partitioner_key": "C", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 3, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 4, "randomField": 5678 }
+{ "partitioner_key": "A", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 5, "randomField": "1234" }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 6, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 7, "randomField": 18 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 8, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 9, "randomField": 18 }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 10, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 11, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 12, "randomField": "5678" }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 13, "randomField": "None" }
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 14, "randomField": 5678 }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 15, "randomField": 18 }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 16, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 17, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "C", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 18, "randomField": 5678 }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 19, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 20, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 21, "randomField": "1234" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 22, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 23, "randomField": 18 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 24, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 25, "randomField": "None" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 26, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 27, "randomField": "5678" }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 28, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 29, "randomField": "1234" }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 30, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "C", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 31, "randomField": "5678" }
+{ "partitioner_key": "B", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 32, "randomField": 5678 }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 33, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 34, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 35, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 36, "randomField": "None" }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 37, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 38, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 39, "randomField": 5678 }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 40, "randomField": "1234" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 41, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 42, "randomField": "5678" }
+{ "partitioner_key": "C", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 43, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 44, "randomField": 18 }
+{ "partitioner_key": "A", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 45, "randomField": 1234 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 46, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 47, "randomField": "None" }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 48, "randomField": "None" }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 49, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 50, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 51, "randomField": "None" }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 52, "randomField": "None" }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 53, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 54, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 55, "randomField": "5678" }
+{ "partitioner_key": "B", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 56, "randomField": 1234 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 57, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 58, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 59, "randomField": "None" }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 60, "randomField": 18 }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 61, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 62, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 63, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 64, "randomField": 18 }
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 65, "randomField": 5678 }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 66, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 67, "randomField": "5678" }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 68, "randomField": { "value": 7890 } }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 69, "randomField": "None" }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 70, "randomField": 18 }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 71, "randomField": 18 }
+{ "partitioner_key": "B", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 72, "randomField": 18 }
+{ "partitioner_key": "C", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 73, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 74, "randomField": 1234 }
+{ "partitioner_key": "A", "price": "101.50", "name": 3456, "active": true, "id": 75, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 76, "randomField": "None" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 77, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 78, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 79, "randomField": "1234" }
+{ "partitioner_key": "A", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 80, "randomField": "1234" }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 81, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 82, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 83, "randomField": "5678" }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 84, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 85, "randomField": "None" }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 86, "randomField": "1234" }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 87, "randomField": 18 }
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 88, "randomField": 5678 }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 89, "randomField": "1234" }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 90, "randomField": "None" }
+{ "partitioner_key": "A", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 91, "randomField": 1234 }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 92, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 93, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 94, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 95, "randomField": "None" }
+{ "partitioner_key": "B", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 96, "randomField": 1234 }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 97, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 98, "randomField": "1234" }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 99, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 100, "randomField": "1234" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 80628ab..ecbd4b9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -50,11 +50,21 @@
</compilation-unit>
</test-case>
<test-case FilePath="copy-to">
+ <compilation-unit name="parquet-partition-heterogeneous">
+ <output-dir compare="Text">parquet-partition-heterogeneous</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to">
<compilation-unit name="parquet-utf8">
<output-dir compare="Text">parquet-utf8</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="copy-to">
+ <compilation-unit name="parquet-heterogeneous">
+ <output-dir compare="Text">parquet-heterogeneous</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to">
<compilation-unit name="parquet-cover-data-types">
<output-dir compare="Text">parquet-cover-data-types</output-dir>
</compilation-unit>
@@ -112,8 +122,7 @@
<test-case FilePath="copy-to/negative">
<compilation-unit name="parquet-error-checks">
<output-dir compare="Text">parquet-error-checks</output-dir>
- <expected-error>ASX1079: Compilation error: TYPE() Expression is required for parquet format</expected-error>
- <expected-error>ASX0037: Type mismatch: expected value of type integer, but got the value of type BINARY</expected-error>
+ <expected-error>ASX0037: Type mismatch: expected value of type BINARY, but got the value of type bigint</expected-error>
<expected-error>HYR0132: Extra field in the result, field 'second' does not exist at 'nested' in the schema</expected-error>
<expected-error>HYR0131: Result does not follow the schema, group type expected but found primitive type at 'nested'</expected-error>
<expected-error>HYR0131: Result does not follow the schema, primitive type expected but found group type at 'name'</expected-error>
@@ -123,9 +132,10 @@
<expected-error>ASX1001: Syntax error</expected-error>
<expected-error>ASX1204: 'binary' type not supported in parquet format</expected-error>
<expected-error>ASX1205: Invalid Parquet Writer Version provided '3'. Supported values: [1, 2]</expected-error>
- <expected-error>ASX1001: Syntax error</expected-error>
- <expected-error>ASX1001: Syntax error</expected-error>
- <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX0039: Expected integer value, got yvghc (in line 22, at column 6)</expected-error>
+ <expected-error>ASX1209: Maximum value allowed for 'max-schemas' is 10. Found 15</expected-error>
+ <expected-error>HYR0133: Schema could not be inferred, empty types found in the result</expected-error>
+ <expected-error>HYR0134: Schema Limit exceeded, maximum number of heterogeneous schemas allowed : '2'</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="copy-to/negative">
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
new file mode 100644
index 0000000..d754068
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaLazyVisitor.generateSchema;
+
+import java.io.Serializable;
+
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactory;
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaTree;
+import org.apache.asterix.runtime.writer.ExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IPathResolver;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetExternalWriterFactory implements Serializable {
+
+ private static final long serialVersionUID = 8971234908711236L;
+ private final IExternalFileWriterFactory writerFactory;
+ private final int maxResult;
+ private final ParquetExternalFilePrinterFactory printerFactory;
+ private final IPathResolver resolver;
+ private final IHyracksTaskContext ctx;
+
+ public ParquetExternalWriterFactory(IHyracksTaskContext ctx, IExternalFileWriterFactory writerFactory,
+ int maxResult, ParquetExternalFilePrinterFactory printerFactory, IPathResolver resolver) {
+ this.ctx = ctx;
+ this.writerFactory = writerFactory;
+ this.maxResult = maxResult;
+ this.printerFactory = printerFactory;
+ this.resolver = resolver;
+ }
+
+ public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+ MessageType schema = generateSchema(schemaNode);
+ printerFactory.setParquetSchemaString(schema.toString());
+ IExternalFileWriter writer = writerFactory.createWriter(ctx, printerFactory);
+ return new ExternalFileWriter(resolver, writer, maxResult);
+ }
+
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
new file mode 100644
index 0000000..b500cbe
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.writer.printer.parquet.ISchemaChecker;
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaLazyVisitor;
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaTree;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ParquetSchemaInferPoolWriter {
+ private final ParquetExternalWriterFactory writerFactory;
+
+ private List<ParquetSchemaTree.SchemaNode> schemaNodes;
+ private List<IExternalWriter> writerList;
+ private final int maxSchemas;
+ private ISchemaChecker schemaChecker;
+ private ParquetSchemaLazyVisitor schemaLazyVisitor;
+
+ public ParquetSchemaInferPoolWriter(ParquetExternalWriterFactory writerFactory, ISchemaChecker schemaChecker,
+ ParquetSchemaLazyVisitor parquetSchemaLazyVisitor, int maxSchemas) {
+ this.writerFactory = writerFactory;
+ this.schemaChecker = schemaChecker;
+ this.schemaLazyVisitor = parquetSchemaLazyVisitor;
+ this.maxSchemas = maxSchemas;
+ this.schemaNodes = new ArrayList<>();
+ this.writerList = new ArrayList<>();
+ }
+
+ public void inferSchema(IValueReference value) throws HyracksDataException {
+ for (int i = 0; i < schemaNodes.size(); i++) {
+ ISchemaChecker.SchemaComparisonType schemaComparisonType =
+ schemaChecker.checkSchema(schemaNodes.get(i), value);
+
+ if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) {
+ return;
+ } else if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.GROWING)) {
+ schemaNodes.set(i, schemaLazyVisitor.inferSchema(value));
+ closeWriter(i);
+ return;
+ }
+ }
+
+ if (schemaNodes.size() == maxSchemas) {
+ throw new HyracksDataException(ErrorCode.SCHEMA_LIMIT_EXCEEDED, maxSchemas);
+ }
+ schemaNodes.add(schemaLazyVisitor.inferSchema(value));
+ writerList.add(null);
+ }
+
+ public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
+ for (int i = 0; i < writerList.size(); i++) {
+ createOrInitPartition(i, tuple);
+ }
+ }
+
+ public void write(IValueReference value) throws HyracksDataException {
+ for (int i = 0; i < schemaNodes.size(); i++) {
+ if (schemaChecker.checkSchema(schemaNodes.get(i), value)
+ .equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) {
+ createOrWrite(i, value);
+ return;
+ }
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < writerList.size(); i++) {
+ closeWriter(i);
+ }
+ }
+
+ private void createOrInitPartition(int index, IFrameTupleReference tupleReference) throws HyracksDataException {
+ if (writerList.get(index) == null) {
+ createWriter(index);
+ }
+ writerList.get(index).initNewPartition(tupleReference);
+ }
+
+ private void createOrWrite(int index, IValueReference value) throws HyracksDataException {
+ if (writerList.get(index) == null) {
+ createWriter(index);
+ }
+ writerList.get(index).write(value);
+ }
+
+ private void createWriter(int index) throws HyracksDataException {
+ writerList.set(index, writerFactory.createWriter(schemaNodes.get(index)));
+ writerList.get(index).open();
+ }
+
+ private void closeWriter(int index) throws HyracksDataException {
+ if (writerList.get(index) != null) {
+ writerList.get(index).close();
+ writerList.set(index, null);
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterFactory.java
new file mode 100644
index 0000000..3a276a2
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactory;
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactoryProvider;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IPathResolverFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.WriterPartitionerFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ParquetSinkExternalWriterFactory extends AbstractPushRuntimeFactory {
+ private static final long serialVersionUID = -1285997525553685225L;
+ private final WriterPartitionerFactory partitionerFactory;
+ private final RecordDescriptor inputRecordDesc;
+ private final int sourceColumn;
+ private final int maxSchemas;
+ private final IAType sourceType;
+ private final IExternalFileWriterFactory writerFactory;
+ private final int maxResult;
+ private final ParquetExternalFilePrinterFactoryProvider printerFactoryProvider;
+ private final IPathResolverFactory pathResolverFactory;
+
+ public ParquetSinkExternalWriterFactory(WriterPartitionerFactory partitionerFactory,
+ RecordDescriptor inputRecordDesc, int sourceColumn, IAType sourceType, int maxSchemas,
+ IExternalFileWriterFactory writerFactory, int maxResult,
+ ParquetExternalFilePrinterFactoryProvider printerFactoryProvider,
+ IPathResolverFactory pathResolverFactory) {
+ this.partitionerFactory = partitionerFactory;
+ this.inputRecordDesc = inputRecordDesc;
+ this.sourceColumn = sourceColumn;
+ this.sourceType = sourceType;
+ this.maxSchemas = maxSchemas;
+ this.writerFactory = writerFactory;
+ this.maxResult = maxResult;
+ this.printerFactoryProvider = printerFactoryProvider;
+ this.pathResolverFactory = pathResolverFactory;
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ ParquetExternalFilePrinterFactory printerFactory =
+ (ParquetExternalFilePrinterFactory) printerFactoryProvider.createPrinterFactory();
+ ParquetExternalWriterFactory parquetExternalWriterFactory = new ParquetExternalWriterFactory(ctx, writerFactory,
+ maxResult, printerFactory, pathResolverFactory.createResolver(ctx));
+ ParquetSinkExternalWriterRuntime runtime =
+ new ParquetSinkExternalWriterRuntime(sourceColumn, partitionerFactory.createPartitioner(),
+ inputRecordDesc, parquetExternalWriterFactory, sourceType, maxSchemas);
+ return new IPushRuntime[] { runtime };
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
new file mode 100644
index 0000000..3dbd4d3
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaLazyVisitor;
+import org.apache.asterix.external.writer.printer.parquet.SchemaCheckerLazyVisitor;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.writer.IWriterPartitioner;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ParquetSinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+ private final int sourceColumn;
+ private final IWriterPartitioner partitioner;
+ private final IPointable sourceValue;
+ private final ParquetExternalWriterFactory writerFactory;
+ private FrameTupleAccessor tupleAccessor;
+ private FrameTupleReference tupleRef;
+ private IFrameWriter frameWriter;
+ private final int maxSchemas;
+ private final IAType sourceType;
+ private ParquetSchemaInferPoolWriter poolWriter;
+
+ public ParquetSinkExternalWriterRuntime(int sourceColumn, IWriterPartitioner partitioner,
+ RecordDescriptor inputRecordDesc, ParquetExternalWriterFactory writerFactory, IAType sourceType,
+ int maxSchemas) {
+ this.sourceColumn = sourceColumn;
+ this.partitioner = partitioner;
+ this.sourceValue = new VoidPointable();
+ this.inputRecordDesc = inputRecordDesc;
+ this.writerFactory = writerFactory;
+ this.sourceType = sourceType;
+ this.maxSchemas = maxSchemas;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter frameWriter, RecordDescriptor recordDesc) {
+ this.frameWriter = frameWriter;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (tupleAccessor == null) {
+ tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
+ tupleRef = new FrameTupleReference();
+ }
+
+ poolWriter = new ParquetSchemaInferPoolWriter(writerFactory, new SchemaCheckerLazyVisitor(sourceType),
+ new ParquetSchemaLazyVisitor(sourceType), maxSchemas);
+ this.frameWriter.open();
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tupleAccessor.reset(buffer);
+
+ for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+ tupleRef.reset(tupleAccessor, i);
+ setValue(tupleRef, sourceColumn, sourceValue);
+ poolWriter.inferSchema(sourceValue);
+ }
+
+ for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+ tupleRef.reset(tupleAccessor, i);
+ setValue(tupleRef, sourceColumn, sourceValue);
+ if (partitioner.isNewPartition(tupleAccessor, i)) {
+ poolWriter.initNewPartition(tupleRef);
+ }
+ poolWriter.write(sourceValue);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ frameWriter.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ poolWriter.close();
+ frameWriter.close();
+ }
+
+ private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
+ byte[] data = tuple.getFieldData(column);
+ int start = tuple.getFieldStart(column);
+ int length = tuple.getFieldLength(column);
+ value.set(data, start, length);
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 550015d..b955fa2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -311,6 +311,8 @@
ILLEGAL_SIZE_PROVIDED(1206),
TYPE_UNSUPPORTED_CSV_WRITE(1207),
INVALID_CSV_SCHEMA(1208),
+ MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
+
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
UTIL_DATAFLOW_UTILS_TUPLE_TOO_LARGE(3002),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 2350c2c..e450657 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -313,6 +313,7 @@
1206 = Storage units expected for the field '%1$s' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided '%2$s'
1207 = '%1$s' type not supported in csv format
1208 = Invalid Copy to CSV schema
+1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 18d5158..1329a29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -351,6 +351,9 @@
List.of(ATypeTag.TINYINT, ATypeTag.SMALLINT, ATypeTag.INTEGER, ATypeTag.BIGINT, ATypeTag.UINT8,
ATypeTag.UINT16, ATypeTag.UINT64, ATypeTag.FLOAT, ATypeTag.DOUBLE, ATypeTag.STRING,
ATypeTag.BOOLEAN, ATypeTag.DATETIME, ATypeTag.UINT32, ATypeTag.DATE, ATypeTag.TIME);
+ public static final String PARQUET_MAX_SCHEMAS_KEY = "max-schemas";
+ public static final int PARQUET_MAX_SCHEMAS_DEFAULT_VALUE = 5;
+ public static final int PARQUET_MAX_SCHEMAS_MAX_VALUE = 10;
static {
WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, FORMAT_PARQUET, FORMAT_CSV_LOWER_CASE);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 3de4067..380a9a8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.util;
import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static org.apache.asterix.common.exceptions.ErrorCode.MAXIMUM_VALUE_ALLOWED_FOR_PARAM;
import static org.apache.asterix.common.exceptions.ErrorCode.MINIMUM_VALUE_ALLOWED_FOR_PARAM;
import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
import static org.apache.asterix.external.util.ExternalDataConstants.FORMAT_CSV;
@@ -27,6 +28,8 @@
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_PAGE_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_ROW_GROUP_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_WRITER_MAX_RESULT;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_MAX_SCHEMAS_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_MAX_SCHEMAS_MAX_VALUE;
import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_KEY;
import static org.apache.asterix.external.util.ExternalDataConstants.WRITER_MAX_RESULT_MINIMUM;
@@ -91,6 +94,7 @@
validateParquetRowGroupSize(configuration);
validateParquetPageSize(configuration);
validateVersion(configuration, sourceLocation);
+ validateMaxParquetSchemas(configuration, sourceLocation);
}
private static void validateVersion(Map<String, String> configuration, SourceLocation sourceLocation)
@@ -175,6 +179,24 @@
}
}
+ private static void validateMaxParquetSchemas(Map<String, String> configuration, SourceLocation sourceLocation)
+ throws CompilationException {
+ String maxResult = configuration.get(PARQUET_MAX_SCHEMAS_KEY);
+ if (maxResult == null) {
+ return;
+ }
+
+ try {
+ int value = Integer.parseInt(maxResult);
+ if (value > PARQUET_MAX_SCHEMAS_MAX_VALUE) {
+ throw new CompilationException(MAXIMUM_VALUE_ALLOWED_FOR_PARAM, PARQUET_MAX_SCHEMAS_KEY,
+ PARQUET_MAX_SCHEMAS_MAX_VALUE, value);
+ }
+ } catch (NumberFormatException e) {
+ throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, maxResult);
+ }
+ }
+
private static void checkSupported(String paramKey, String value, Set<String> supportedSet, ErrorCode errorCode,
SourceLocation sourceLocation, boolean optional) throws CompilationException {
if (optional && value == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index 5ccd2fe..b6ad34e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -25,8 +25,8 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactory {
- private static final long serialVersionUID = 8971234908711234L;
- private final String parquetSchemaString;
+ private static final long serialVersionUID = 8971234908711235L;
+ private String parquetSchemaString;
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private final long rowGroupSize;
@@ -43,6 +43,19 @@
this.writerVersion = writerVersion;
}
+ public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, IAType typeInfo,
+ long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
+ this.compressionCodecName = compressionCodecName;
+ this.typeInfo = typeInfo;
+ this.rowGroupSize = rowGroupSize;
+ this.pageSize = pageSize;
+ this.writerVersion = writerVersion;
+ }
+
+ public void setParquetSchemaString(String parquetSchemaString) {
+ this.parquetSchemaString = parquetSchemaString;
+ }
+
@Override
public IExternalPrinter createPrinter() {
return new ParquetExternalFilePrinter(compressionCodecName, parquetSchemaString, typeInfo, rowGroupSize,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactoryProvider.java
new file mode 100644
index 0000000..bd70853
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactoryProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.Serializable;
+
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class ParquetExternalFilePrinterFactoryProvider implements Serializable {
+ private static final long serialVersionUID = 8971234908711237L;
+ private final IAType typeInfo;
+ private final CompressionCodecName compressionCodecName;
+ private final long rowGroupSize;
+ private final int pageSize;
+ private final ParquetProperties.WriterVersion writerVersion;
+
+ public ParquetExternalFilePrinterFactoryProvider(CompressionCodecName compressionCodecName, IAType typeInfo,
+ long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
+ this.compressionCodecName = compressionCodecName;
+ this.typeInfo = typeInfo;
+ this.rowGroupSize = rowGroupSize;
+ this.pageSize = pageSize;
+ this.writerVersion = writerVersion;
+ }
+
+ public IExternalPrinterFactory createPrinterFactory() {
+ return new ParquetExternalFilePrinterFactory(compressionCodecName, typeInfo, rowGroupSize, pageSize,
+ writerVersion);
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
new file mode 100644
index 0000000..99b9736
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public interface ISchemaChecker {
+ enum SchemaComparisonType {
+ EQUIVALENT,
+ GROWING,
+ CONFLICTING
+ }
+
+ static SchemaComparisonType max(ISchemaChecker.SchemaComparisonType a, ISchemaChecker.SchemaComparisonType b) {
+ return a.compareTo(b) > 0 ? a : b;
+ }
+
+ SchemaComparisonType checkSchema(ParquetSchemaTree.SchemaNode schemaNode, IValueReference iValueReference)
+ throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
index f6e85ef..373bfe4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
@@ -43,16 +43,14 @@
public class ParquetRecordLazyVisitor implements ILazyVisitablePointableVisitor<Void, Type> {
private final MessageType schema;
- private final IAType typeInfo;
private final RecordLazyVisitablePointable rec;
private RecordConsumer recordConsumer;
- private FieldNamesDictionary fieldNamesDictionary;
+ private final FieldNamesDictionary fieldNamesDictionary;
private final ParquetValueWriter parquetValueWriter;
public ParquetRecordLazyVisitor(MessageType schema, IAType typeInfo) {
this.schema = schema;
- this.typeInfo = typeInfo;
if (typeInfo.getTypeTag() == ATypeTag.OBJECT) {
this.rec = new TypedRecordLazyVisitablePointable((ARecordType) typeInfo);
} else if (typeInfo.getTypeTag() == ATypeTag.ANY) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
new file mode 100644
index 0000000..da0cef0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class ParquetSchemaBuilderUtils {
+
+ public static Types.BaseGroupBuilder<?, ?> getGroupChild(Types.Builder parent) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optionalGroup();
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>) parent).optionalGroupElement();
+ } else {
+ return null;
+ }
+ }
+
+ public static Types.BaseListBuilder<?, ?> getListChild(Types.Builder parent) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optionalList();
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>) parent).optionalListElement();
+ } else {
+ return null;
+ }
+ }
+
+ public static Types.Builder<?, ?> getPrimitiveChild(Types.Builder parent, PrimitiveType.PrimitiveTypeName type,
+ LogicalTypeAnnotation annotation) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation);
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation);
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
new file mode 100644
index 0000000..055c635
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaTree.buildParquetSchema;
+
+import java.util.Map;
+
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+
+public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> {
+ private final RecordLazyVisitablePointable rec;
+ private final FieldNamesDictionary fieldNamesDictionary;
+ private final static String SCHEMA_NAME = "asterix_schema";
+
+ public ParquetSchemaLazyVisitor(IAType typeInfo) {
+ this.fieldNamesDictionary = new FieldNamesDictionary();
+ if (typeInfo.getTypeTag() == ATypeTag.OBJECT) {
+ this.rec = new TypedRecordLazyVisitablePointable((ARecordType) typeInfo);
+ } else if (typeInfo.getTypeTag() == ATypeTag.ANY) {
+ this.rec = new RecordLazyVisitablePointable(true);
+ } else {
+ throw new RuntimeException("Type Unsupported for parquet printing");
+ }
+ }
+
+ @Override
+ public Void visit(RecordLazyVisitablePointable pointable, ParquetSchemaTree.SchemaNode schemaNode)
+ throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ schemaNode.setType(new ParquetSchemaTree.RecordType());
+ }
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.RecordType)) {
+ throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+ }
+ ParquetSchemaTree.RecordType recordType = (ParquetSchemaTree.RecordType) schemaNode.getType();
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+ String childColumnName = fieldNamesDictionary.getOrCreateFieldNameIndex(pointable.getFieldName());
+ ParquetSchemaTree.SchemaNode childType;
+ if (recordType.getChildren().containsKey(childColumnName)) {
+ childType = recordType.getChildren().get(childColumnName);
+ } else {
+ childType = new ParquetSchemaTree.SchemaNode();
+ recordType.add(childColumnName, childType);
+ }
+ child.accept(this, childType);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractListLazyVisitablePointable pointable, ParquetSchemaTree.SchemaNode schemaNode)
+ throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ schemaNode.setType(new ParquetSchemaTree.ListType());
+ }
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType)) {
+ throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+ }
+ ParquetSchemaTree.ListType listType = (ParquetSchemaTree.ListType) schemaNode.getType();
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+ if (listType.isEmpty()) {
+ listType.setChild(new ParquetSchemaTree.SchemaNode());
+ }
+ child.accept(this, listType.getChild());
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(FlatLazyVisitablePointable pointable, ParquetSchemaTree.SchemaNode schemaNode)
+ throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ schemaNode.setType(new ParquetSchemaTree.FlatType(
+ AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP.get(pointable.getTypeTag()),
+ AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP.get(pointable.getTypeTag())));
+ return null;
+ }
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.FlatType)) {
+ throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+ }
+ ParquetSchemaTree.FlatType flatType = (ParquetSchemaTree.FlatType) schemaNode.getType();
+ if (!(flatType.getPrimitiveTypeName() == AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP
+ .get(pointable.getTypeTag()))
+ || !(flatType.getLogicalTypeAnnotation() == AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP
+ .get(pointable.getTypeTag()))) {
+ throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+ }
+ return null;
+ }
+
+ public ParquetSchemaTree.SchemaNode inferSchema(IValueReference valueReference) throws HyracksDataException {
+ ParquetSchemaTree.SchemaNode schemaNode = new ParquetSchemaTree.SchemaNode();
+ rec.set(valueReference);
+ rec.accept(this, schemaNode);
+ return schemaNode;
+ }
+
+ public static MessageType generateSchema(ParquetSchemaTree.SchemaNode schemaRoot) throws HyracksDataException {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+ if (schemaRoot.getType() == null)
+ return builder.named(SCHEMA_NAME);
+ for (Map.Entry<String, ParquetSchemaTree.SchemaNode> entry : ((ParquetSchemaTree.RecordType) schemaRoot
+ .getType()).getChildren().entrySet()) {
+ buildParquetSchema(builder, entry.getValue(), entry.getKey());
+ }
+ return builder.named(SCHEMA_NAME);
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
new file mode 100644
index 0000000..ff512df
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getGroupChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getListChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getPrimitiveChild;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class ParquetSchemaTree {
+
+ public static class SchemaNode {
+ private AbstractType type;
+
+ public SchemaNode() {
+ type = null;
+ }
+
+ public void setType(AbstractType type) {
+ this.type = type;
+ }
+
+ public AbstractType getType() {
+ return type;
+ }
+ }
+
+ static class RecordType extends AbstractType {
+ private final Map<String, SchemaNode> children;
+
+ public RecordType() {
+ children = new HashMap<>();
+ }
+
+ void add(String fieldName, SchemaNode childType) {
+ children.put(fieldName, childType);
+ }
+
+ SchemaNode get(String fieldName) {
+ return children.get(fieldName);
+ }
+
+ Map<String, SchemaNode> getChildren() {
+ return children;
+ }
+ }
+
+ abstract static class AbstractType {
+ }
+
+ static class FlatType extends AbstractType {
+ private final PrimitiveType.PrimitiveTypeName primitiveTypeName;
+ private final LogicalTypeAnnotation logicalTypeAnnotation;
+
+ public FlatType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
+ LogicalTypeAnnotation logicalTypeAnnotation) {
+ this.primitiveTypeName = primitiveTypeName;
+ this.logicalTypeAnnotation = logicalTypeAnnotation;
+ }
+
+ public LogicalTypeAnnotation getLogicalTypeAnnotation() {
+ return logicalTypeAnnotation;
+ }
+
+ public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() {
+ return primitiveTypeName;
+ }
+ }
+
+ static class ListType extends AbstractType {
+ private SchemaNode child;
+
+ public ListType() {
+ child = null;
+ }
+
+ void setChild(SchemaNode child) {
+ this.child = child;
+ }
+
+ boolean isEmpty() {
+ return child == null;
+ }
+
+ public SchemaNode getChild() {
+ return child;
+ }
+ }
+
+ public static void buildParquetSchema(Types.Builder builder, SchemaNode schemaNode, String columnName)
+ throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
+ }
+ AbstractType typeClass = schemaNode.getType();
+ if (typeClass instanceof RecordType) {
+ buildRecord(builder, (RecordType) schemaNode.getType(), columnName);
+ } else if (typeClass instanceof ListType) {
+ buildList(builder, (ListType) schemaNode.getType(), columnName);
+ } else if (typeClass instanceof FlatType) {
+ buildFlat(builder, (FlatType) schemaNode.getType(), columnName);
+ }
+ }
+
+ private static void buildRecord(Types.Builder builder, RecordType type, String columnName)
+ throws HyracksDataException {
+ Types.BaseGroupBuilder<?, ?> childBuilder = getGroupChild(builder);
+ for (Map.Entry<String, SchemaNode> entry : type.getChildren().entrySet()) {
+ buildParquetSchema(childBuilder, entry.getValue(), entry.getKey());
+ }
+ childBuilder.named(columnName);
+ }
+
+ private static void buildList(Types.Builder builder, ListType type, String columnName) throws HyracksDataException {
+ Types.BaseListBuilder<?, ?> childBuilder = getListChild(builder);
+ SchemaNode child = type.child;
+ if (child == null) {
+ throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
+ }
+ buildParquetSchema(childBuilder, child, columnName);
+ }
+
+ private static void buildFlat(Types.Builder builder, FlatType type, String columnName) throws HyracksDataException {
+ if (type.getPrimitiveTypeName() == null) {
+ // Not sure if this is the right thing to do here
+ throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
+ }
+ getPrimitiveChild(builder, type.getPrimitiveTypeName(), type.getLogicalTypeAnnotation()).named(columnName);
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
index 206a3c9..0390315 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
@@ -74,7 +74,7 @@
recordConsumer.addDouble(value);
break;
default:
- throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+ throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
}
}
@@ -129,7 +129,7 @@
recordConsumer.addDouble(floatValue);
break;
default:
- throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+ throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
}
break;
case DOUBLE:
@@ -148,7 +148,7 @@
recordConsumer.addDouble(doubleValue);
break;
default:
- throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+ throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
}
break;
case STRING:
@@ -164,7 +164,7 @@
byteArrayOutputStream.getLength()));
} else {
- throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+ throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
}
break;
case BOOLEAN:
@@ -172,7 +172,7 @@
if (primitiveTypeName == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
recordConsumer.addBoolean(booleanValue);
} else {
- throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+ throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
}
break;
case DATE:
@@ -190,7 +190,7 @@
case MISSING:
break;
default:
- throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+ throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
new file mode 100644
index 0000000..44cd5b2
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class SchemaCheckerLazyVisitor implements ISchemaChecker,
+ ILazyVisitablePointableVisitor<ISchemaChecker.SchemaComparisonType, ParquetSchemaTree.SchemaNode> {
+ private final FieldNamesDictionary fieldNamesDictionary;
+ private final RecordLazyVisitablePointable record;
+
+ public SchemaCheckerLazyVisitor(IAType typeInfo) {
+ this.fieldNamesDictionary = new FieldNamesDictionary();
+ if (typeInfo.getTypeTag() == ATypeTag.OBJECT) {
+ this.record = new TypedRecordLazyVisitablePointable((ARecordType) typeInfo);
+ } else if (typeInfo.getTypeTag() == ATypeTag.ANY) {
+ this.record = new RecordLazyVisitablePointable(true);
+ } else {
+ throw new RuntimeException("Type Unsupported for parquet printing");
+ }
+ }
+
+ @Override
+ public ISchemaChecker.SchemaComparisonType visit(RecordLazyVisitablePointable pointable,
+ ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ return ISchemaChecker.SchemaComparisonType.GROWING;
+ }
+
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.RecordType)) {
+ return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+ }
+
+ ParquetSchemaTree.RecordType recordType = (ParquetSchemaTree.RecordType) schemaNode.getType();
+ ISchemaChecker.SchemaComparisonType schemaComparisonType = ISchemaChecker.SchemaComparisonType.EQUIVALENT;
+
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+ String childColumnName = fieldNamesDictionary.getOrCreateFieldNameIndex(pointable.getFieldName());
+ ParquetSchemaTree.SchemaNode childType = recordType.getChildren().get(childColumnName);
+ if (childType == null) {
+ schemaComparisonType =
+ ISchemaChecker.max(schemaComparisonType, ISchemaChecker.SchemaComparisonType.GROWING);
+ continue;
+ }
+ schemaComparisonType = ISchemaChecker.max(schemaComparisonType, child.accept(this, childType));
+ }
+ return schemaComparisonType;
+ }
+
+ @Override
+ public ISchemaChecker.SchemaComparisonType visit(AbstractListLazyVisitablePointable pointable,
+ ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ return ISchemaChecker.SchemaComparisonType.GROWING;
+ }
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType)) {
+ return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+ }
+
+ ParquetSchemaTree.ListType listType = (ParquetSchemaTree.ListType) schemaNode.getType();
+ ISchemaChecker.SchemaComparisonType schemaComparisonType = ISchemaChecker.SchemaComparisonType.EQUIVALENT;
+
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+ if (listType.isEmpty()) {
+ schemaComparisonType =
+ ISchemaChecker.max(schemaComparisonType, ISchemaChecker.SchemaComparisonType.GROWING);
+ continue;
+ }
+ schemaComparisonType = ISchemaChecker.max(schemaComparisonType, child.accept(this, listType.getChild()));
+ }
+ return schemaComparisonType;
+ }
+
+ @Override
+ public ISchemaChecker.SchemaComparisonType visit(FlatLazyVisitablePointable pointable,
+ ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+ if (schemaNode.getType() == null) {
+ return ISchemaChecker.SchemaComparisonType.GROWING;
+ }
+ if (!(schemaNode.getType() instanceof ParquetSchemaTree.FlatType)) {
+ return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+ }
+
+ ParquetSchemaTree.FlatType flatType = (ParquetSchemaTree.FlatType) schemaNode.getType();
+
+ if (flatType.getPrimitiveTypeName() != AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP.get(pointable.getTypeTag())
+ || !(flatType.getLogicalTypeAnnotation() == AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP
+ .get(pointable.getTypeTag()))) {
+ return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+ }
+
+ return ISchemaChecker.SchemaComparisonType.EQUIVALENT;
+ }
+
+ @Override
+ public SchemaComparisonType checkSchema(ParquetSchemaTree.SchemaNode schemaNode, IValueReference valueReference)
+ throws HyracksDataException {
+ record.set(valueReference);
+ return record.accept(this, schemaNode);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
index b25d1f5..a6ea115 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.writer.printer.parquet;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getGroupChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getListChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getPrimitiveChild;
+
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.om.types.ARecordType;
@@ -117,35 +121,4 @@
return null;
}
- private static Types.BaseGroupBuilder getGroupChild(Types.Builder parent) {
- if (parent instanceof Types.BaseGroupBuilder) {
- return ((Types.BaseGroupBuilder<?, ?>) parent).optionalGroup();
- } else if (parent instanceof Types.BaseListBuilder) {
- return ((Types.BaseListBuilder<?, ?>) parent).optionalGroupElement();
- } else {
- return null;
- }
- }
-
- private static Types.BaseListBuilder getListChild(Types.Builder parent) {
- if (parent instanceof Types.BaseGroupBuilder) {
- return ((Types.BaseGroupBuilder<?, ?>) parent).optionalList();
- } else if (parent instanceof Types.BaseListBuilder) {
- return ((Types.BaseListBuilder<?, ?>) parent).optionalListElement();
- } else {
- return null;
- }
- }
-
- private static Types.Builder getPrimitiveChild(Types.Builder parent, PrimitiveType.PrimitiveTypeName type,
- LogicalTypeAnnotation annotation) {
- if (parent instanceof Types.BaseGroupBuilder) {
- return ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation);
- } else if (parent instanceof Types.BaseListBuilder) {
- return ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation);
- } else {
- return null;
- }
- }
-
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index cc456e5..c152853 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -100,7 +100,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
@@ -110,9 +109,6 @@
import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -143,7 +139,6 @@
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalWriterRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -794,17 +789,9 @@
IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression staticPathExpr,
SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
throws AlgebricksException {
- String staticPath = staticPathExpr != null ? ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
- IExternalFileWriterFactory fileWriterFactory =
- ExternalWriterProvider.createWriterFactory(appCtx, sink, staticPath, pathSourceLocation);
- fileWriterFactory.validate();
- String fileExtension = ExternalWriterProvider.getFileExtension(sink);
- int maxResult = ExternalWriterProvider.getMaxResult(sink);
- IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(appCtx, sink, sourceType);
- ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
- fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
- SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
- partitionComparatorFactories, inputDesc, writerFactory);
+ IPushRuntimeFactory runtime = ExternalWriterProvider.getWriteFileRuntime(appCtx, sink, sourceType,
+ staticPathExpr, pathSourceLocation, dynamicPathEvalFactory, inputDesc, sourceColumn, partitionColumns,
+ partitionComparatorFactories);
return new Pair<>(runtime, null);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 73bdbb8..f58df7e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.zip.Deflater;
+import org.apache.asterix.cloud.parquet.ParquetSinkExternalWriterFactory;
import org.apache.asterix.cloud.writer.GCSExternalFileWriterFactory;
import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -35,18 +36,30 @@
import org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
import org.apache.asterix.external.writer.printer.CsvExternalFilePrinterFactory;
import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactory;
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactoryProvider;
import org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
import org.apache.asterix.formats.nontagged.CSVPrinterFactoryProvider;
import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
import org.apache.asterix.metadata.declared.IExternalWriteDataSink;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.asterix.runtime.writer.PathResolverFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.WriterPartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.util.StorageUtil;
@@ -66,7 +79,7 @@
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, GCSExternalFileWriterFactory.PROVIDER);
}
- public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
+ private static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
String staticPath, SourceLocation pathExpressionLocation) {
String adapterName = sink.getAdapterName().toLowerCase();
IExternalFileWriterFactoryProvider creator = CREATOR_MAP.get(adapterName);
@@ -78,14 +91,14 @@
return creator.create(createConfiguration(appCtx, sink, staticPath, pathExpressionLocation));
}
- public static String getFileExtension(IWriteDataSink sink) {
+ private static String getFileExtension(IWriteDataSink sink) {
Map<String, String> configuration = sink.getConfiguration();
String format = getFormat(configuration);
String compression = getCompression(configuration);
return format + (compression.isEmpty() ? "" : "." + compression);
}
- public static int getMaxResult(IWriteDataSink sink) {
+ private static int getMaxResult(IWriteDataSink sink) {
String maxResultString = sink.getConfiguration().get(ExternalDataConstants.KEY_WRITER_MAX_RESULT);
if (maxResultString == null) {
return ExternalDataConstants.WRITER_MAX_RESULT_DEFAULT;
@@ -93,6 +106,14 @@
return Integer.parseInt(maxResultString);
}
+ private static int getMaxParquetSchema(Map<String, String> conf) {
+ String maxResultString = conf.get(ExternalDataConstants.PARQUET_MAX_SCHEMAS_KEY);
+ if (maxResultString == null) {
+ return ExternalDataConstants.PARQUET_MAX_SCHEMAS_DEFAULT_VALUE;
+ }
+ return Integer.parseInt(maxResultString);
+ }
+
private static ExternalFileWriterConfiguration createConfiguration(ICcApplicationContext appCtx,
IWriteDataSink sink, String staticPath, SourceLocation pathExpressionLocation) {
Map<String, String> params = sink.getConfiguration();
@@ -117,28 +138,69 @@
CREATOR_MAP.put(adapterName.toLowerCase(), creator);
}
- public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink,
- Object sourceType) throws CompilationException {
+ public static IPushRuntimeFactory getWriteFileRuntime(ICcApplicationContext appCtx, IWriteDataSink sink,
+ Object sourceType, ILogicalExpression staticPathExpr, SourceLocation pathSourceLocation,
+ IScalarEvaluatorFactory dynamicPathEvalFactory, RecordDescriptor inputDesc, int sourceColumn,
+ int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories)
+ throws AlgebricksException {
+ String staticPath = staticPathExpr != null ? ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
+ IExternalFileWriterFactory fileWriterFactory =
+ ExternalWriterProvider.createWriterFactory(appCtx, sink, staticPath, pathSourceLocation);
+ fileWriterFactory.validate();
+ String fileExtension = ExternalWriterProvider.getFileExtension(sink);
+ int maxResult = ExternalWriterProvider.getMaxResult(sink);
+
Map<String, String> configuration = sink.getConfiguration();
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
// Check for supported formats
- if (!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)
- && !ExternalDataConstants.FORMAT_PARQUET.equalsIgnoreCase(format)
- && !ExternalDataConstants.FORMAT_CSV_LOWER_CASE.equalsIgnoreCase(format)) {
+ if (!ExternalDataConstants.WRITER_SUPPORTED_FORMATS.contains(format.toLowerCase())) {
throw new UnsupportedOperationException("Unsupported format " + format);
}
String compression = getCompression(configuration);
+ WriterPartitionerFactory partitionerFactory =
+ new WriterPartitionerFactory(partitionColumns, partitionComparatorFactories);
+ PathResolverFactory pathResolverFactory = new PathResolverFactory(fileWriterFactory, fileExtension,
+ dynamicPathEvalFactory, staticPath, pathSourceLocation);
IPrinterFactory printerFactory;
IExternalFileCompressStreamFactory compressStreamFactory;
+ IExternalPrinterFactory externalPrinterFactory;
+ ExternalFileWriterFactory writerFactory;
switch (format) {
case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
compressStreamFactory = createCompressionStreamFactory(appCtx, compression, configuration);
printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
- return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
+ externalPrinterFactory = new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
+ writerFactory = new ExternalFileWriterFactory(fileWriterFactory, externalPrinterFactory,
+ pathResolverFactory, maxResult);
+
+ return new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
+ partitionComparatorFactories, inputDesc, writerFactory);
+ case ExternalDataConstants.FORMAT_CSV_LOWER_CASE:
+ compressStreamFactory = createCompressionStreamFactory(appCtx, compression, configuration);
+ if (sink instanceof IExternalWriteDataSink) {
+ ARecordType itemType = ((IExternalWriteDataSink) sink).getItemType();
+ if (itemType != null) {
+ printerFactory =
+ CSVPrinterFactoryProvider
+ .createInstance(itemType, sink.getConfiguration(),
+ ((IExternalWriteDataSink) sink).getSourceLoc())
+ .getPrinterFactory(sourceType);
+ externalPrinterFactory =
+ new CsvExternalFilePrinterFactory(printerFactory, compressStreamFactory);
+ writerFactory = new ExternalFileWriterFactory(fileWriterFactory, externalPrinterFactory,
+ pathResolverFactory, maxResult);
+ return new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
+ partitionComparatorFactories, inputDesc, writerFactory);
+ } else {
+ throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
+ }
+ } else {
+ throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
+ }
+
case ExternalDataConstants.FORMAT_PARQUET:
- String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
CompressionCodecName compressionCodecName;
if (compression == null || compression.equals("") || compression.equals("none")) {
@@ -152,31 +214,32 @@
long rowGroupSize = StorageUtil.getByteValue(rowGroupSizeString);
int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
-
ParquetProperties.WriterVersion writerVersion = getParquetWriterVersion(configuration);
- return new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
- (IAType) sourceType, rowGroupSize, pageSize, writerVersion);
- case ExternalDataConstants.FORMAT_CSV_LOWER_CASE:
- compressStreamFactory = createCompressionStreamFactory(appCtx, compression, configuration);
- if (sink instanceof IExternalWriteDataSink) {
- ARecordType itemType = ((IExternalWriteDataSink) sink).getItemType();
- if (itemType != null) {
- printerFactory =
- CSVPrinterFactoryProvider
- .createInstance(itemType, sink.getConfiguration(),
- ((IExternalWriteDataSink) sink).getSourceLoc())
- .getPrinterFactory(sourceType);
- return new CsvExternalFilePrinterFactory(printerFactory, compressStreamFactory);
- } else {
- throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
- }
- } else {
- throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
+ if (configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) {
+ String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
+ ParquetExternalFilePrinterFactory parquetPrinterFactory =
+ new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+ (IAType) sourceType, rowGroupSize, pageSize, writerVersion);
+
+ ExternalFileWriterFactory parquetWriterFactory = new ExternalFileWriterFactory(fileWriterFactory,
+ parquetPrinterFactory, pathResolverFactory, maxResult);
+ return new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
+ partitionComparatorFactories, inputDesc, parquetWriterFactory);
}
+
+ int maxSchemas = ExternalWriterProvider.getMaxParquetSchema(configuration);
+ ParquetExternalFilePrinterFactoryProvider printerFactoryProvider =
+ new ParquetExternalFilePrinterFactoryProvider(compressionCodecName, (IAType) sourceType,
+ rowGroupSize, pageSize, writerVersion);
+ return new ParquetSinkExternalWriterFactory(partitionerFactory, inputDesc, sourceColumn,
+ (IAType) sourceType, maxSchemas, fileWriterFactory, maxResult, printerFactoryProvider,
+ pathResolverFactory);
+
default:
throw new UnsupportedOperationException("Unsupported format " + format);
}
+
}
private static ParquetProperties.WriterVersion getParquetWriterVersion(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index d7a71bf..b466d0d 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -154,5 +154,9 @@
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index c6d1dbe..e3ccc33 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -32,7 +32,7 @@
import org.apache.hyracks.util.string.UTF8CharBuffer;
import org.apache.hyracks.util.string.UTF8StringUtil;
-final class DynamicPathResolver extends AbstractPathResolver {
+public final class DynamicPathResolver extends AbstractPathResolver {
private final IScalarEvaluator pathEval;
private final IWarningCollector warningCollector;
private final StringBuilder dirStringBuilder;
@@ -40,7 +40,7 @@
private final UTF8CharBuffer charBuffer;
private final SourceLocation pathSourceLocation;
- DynamicPathResolver(String fileExtension, char fileSeparator, int partition, IScalarEvaluator pathEval,
+ public DynamicPathResolver(String fileExtension, char fileSeparator, int partition, IScalarEvaluator pathEval,
IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
super(fileExtension, fileSeparator, partition);
this.pathEval = pathEval;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index 95dc962..e8bae00 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-final class ExternalFileWriter implements IExternalWriter {
+public final class ExternalFileWriter implements IExternalWriter {
static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
private final IPathResolver pathResolver;
private final IExternalFileWriter writer;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
index 5981584..f85a8ab 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -18,52 +18,29 @@
*/
package org.apache.asterix.runtime.writer;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
public class ExternalFileWriterFactory implements IExternalWriterFactory {
private static final long serialVersionUID = 1412969574113419638L;
private final IExternalFileWriterFactory writerFactory;
private final IExternalPrinterFactory printerFactory;
- private final String fileExtension;
private final int maxResult;
- private final IScalarEvaluatorFactory pathEvalFactory;
- private final String staticPath;
- private final SourceLocation pathSourceLocation;
+ private final IPathResolverFactory pathResolverFactory;
public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory, IExternalPrinterFactory printerFactory,
- String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
- SourceLocation pathSourceLocation) {
+ IPathResolverFactory pathResolverFactory, int maxResult) {
this.writerFactory = writerFactory;
this.printerFactory = printerFactory;
- this.fileExtension = fileExtension;
+ this.pathResolverFactory = pathResolverFactory;
this.maxResult = maxResult;
- this.pathEvalFactory = pathEvalFactory;
- this.staticPath = staticPath;
- this.pathSourceLocation = pathSourceLocation;
}
@Override
public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
- int partition = context.getTaskAttemptId().getTaskId().getPartition();
- char fileSeparator = writerFactory.getSeparator();
- IPathResolver resolver;
- if (staticPath == null) {
- EvaluatorContext evaluatorContext = new EvaluatorContext(context);
- IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
- IWarningCollector warningCollector = context.getWarningCollector();
- resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
- pathSourceLocation);
- } else {
- resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
- }
+ IPathResolver resolver = pathResolverFactory.createResolver(context);
IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
return new ExternalFileWriter(resolver, writer, maxResult);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
index 35b4ddd..c4a3b04 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -24,7 +24,7 @@
/**
* Path resolver which generates paths for the written files
*/
-interface IPathResolver {
+public interface IPathResolver {
/**
* Extract the partitioning values from the provided tuple and generates the file path
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolverFactory.java
similarity index 69%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolverFactory.java
index b6ec758..18a8799 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolverFactory.java
@@ -16,22 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.asterix.runtime.writer;
-USE test;
+import java.io.Serializable;
-COPY (
- select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
- "container":"playground",
- "format":"parquet"
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPathResolverFactory extends Serializable {
+
+ IPathResolver createResolver(IHyracksTaskContext ctx) throws HyracksDataException;
+
}
-
-
-
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/PathResolverFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/PathResolverFactory.java
new file mode 100644
index 0000000..8707cae
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/PathResolverFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class PathResolverFactory implements IPathResolverFactory {
+ private static final long serialVersionUID = 8971234908711238L;
+ private final IExternalFileWriterFactory writerFactory;
+ private final String fileExtension;
+ private final IScalarEvaluatorFactory pathEvalFactory;
+ private final String staticPath;
+ private final SourceLocation pathSourceLocation;
+
+ public PathResolverFactory(IExternalFileWriterFactory writerFactory, String fileExtension,
+ IScalarEvaluatorFactory pathEvalFactory, String staticPath, SourceLocation pathSourceLocation) {
+ this.writerFactory = writerFactory;
+ this.fileExtension = fileExtension;
+ this.pathEvalFactory = pathEvalFactory;
+ this.pathSourceLocation = pathSourceLocation;
+ this.staticPath = staticPath;
+ }
+
+ @Override
+ public IPathResolver createResolver(IHyracksTaskContext context) throws HyracksDataException {
+ int partition = context.getTaskAttemptId().getTaskId().getPartition();
+ char fileSeparator = writerFactory.getSeparator();
+ IPathResolver resolver;
+ if (staticPath == null) {
+ EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+ IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
+ IWarningCollector warningCollector = context.getWarningCollector();
+ resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
+ pathSourceLocation);
+ } else {
+ resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
+ }
+ return resolver;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
index 52943ed..c5fa0dd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -20,10 +20,10 @@
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-final class StaticPathResolver extends AbstractPathResolver {
+public final class StaticPathResolver extends AbstractPathResolver {
private final String directoryPath;
- StaticPathResolver(String fileExtension, char fileSeparator, int partition, String directoryPath) {
+ public StaticPathResolver(String fileExtension, char fileSeparator, int partition, String directoryPath) {
super(fileExtension, fileSeparator, partition);
if (!directoryPath.isEmpty() && directoryPath.charAt(directoryPath.length() - 1) != fileSeparator) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
index cb72aec..7cf4d05 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
@@ -21,7 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-interface IWriterPartitioner {
+public interface IWriterPartitioner {
boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
index 321828f..3612511 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -23,7 +23,6 @@
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,64 +30,35 @@
public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
private static final long serialVersionUID = -2215789207336628581L;
private final int sourceColumn;
- private final int[] partitionColumns;
- private final IBinaryComparatorFactory[] partitionComparatorFactories;
- private final boolean partitionByKey;
private final RecordDescriptor inputRecordDescriptor;
private final IExternalWriterFactory writerFactory;
+ private final WriterPartitionerFactory partitionerFactory;
public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
IExternalWriterFactory writerFactory) {
- this(sourceColumn, partitionColumns, partitionComparatorFactories, false, inputRecordDescriptor, writerFactory);
+ this(sourceColumn, inputRecordDescriptor, writerFactory,
+ new WriterPartitionerFactory(partitionColumns, partitionComparatorFactories));
}
public SinkExternalWriterRuntimeFactory(int sourceColumn, RecordDescriptor inputRecordDescriptor,
IExternalWriterFactory writerFactory) {
- this(sourceColumn, null, null, true, inputRecordDescriptor, writerFactory);
+ this(sourceColumn, inputRecordDescriptor, writerFactory, new WriterPartitionerFactory());
}
- private SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumns,
- IBinaryComparatorFactory[] partitionComparatorFactories, boolean partitionByKey,
- RecordDescriptor inputRecordDescriptor, IExternalWriterFactory writerFactory) {
+ private SinkExternalWriterRuntimeFactory(int sourceColumn, RecordDescriptor inputRecordDescriptor,
+ IExternalWriterFactory writerFactory, WriterPartitionerFactory partitionerFactory) {
this.sourceColumn = sourceColumn;
- this.partitionColumns = partitionColumns;
- this.partitionComparatorFactories = partitionComparatorFactories;
- this.partitionByKey = partitionByKey;
this.inputRecordDescriptor = inputRecordDescriptor;
this.writerFactory = writerFactory;
+ this.partitionerFactory = partitionerFactory;
}
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
IExternalWriter writer = writerFactory.createWriter(ctx);
- SinkExternalWriterRuntime runtime =
- new SinkExternalWriterRuntime(sourceColumn, createPartitioner(), inputRecordDescriptor, writer);
+ SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn,
+ partitionerFactory.createPartitioner(), inputRecordDescriptor, writer);
return new IPushRuntime[] { runtime };
}
-
- /**
- * Creates the writer partitioner based on the provided parameters
- *
- * @return writer partitioner
- */
- private IWriterPartitioner createPartitioner() {
- // key writer partitioner
- if (partitionByKey) {
- return KeyWriterPartitioner.INSTANCE;
- }
-
- // writer partitioner
- if (partitionColumns.length > 0) {
- IBinaryComparator[] comparators = new IBinaryComparator[partitionComparatorFactories.length];
- for (int i = 0; i < partitionComparatorFactories.length; i++) {
- comparators[i] = partitionComparatorFactories[i].createBinaryComparator();
- }
-
- return new WriterPartitioner(partitionColumns, comparators);
- }
-
- // no-op partitioner
- return new NoOpWriterPartitioner();
- }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitionerFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitionerFactory.java
new file mode 100644
index 0000000..1d5b089
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitionerFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class WriterPartitionerFactory implements Serializable {
+ private static final long serialVersionUID = 8971234908711239L;
+ private final boolean partitionByKey;
+ private final int[] partitionColumns;
+ private final IBinaryComparatorFactory[] partitionComparatorFactories;
+
+ public WriterPartitionerFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories) {
+ this(partitionColumns, partitionComparatorFactories, false);
+ }
+
+ public WriterPartitionerFactory() {
+ this(null, null, true);
+ }
+
+ private WriterPartitionerFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ boolean partitionByKey) {
+ this.partitionColumns = partitionColumns;
+ this.partitionComparatorFactories = partitionComparatorFactories;
+ this.partitionByKey = partitionByKey;
+ }
+
+ /**
+ * Creates the writer partitioner based on the provided parameters
+ *
+ * @return writer partitioner
+ */
+ public IWriterPartitioner createPartitioner() {
+ // key writer partitioner
+ if (partitionByKey) {
+ return KeyWriterPartitioner.INSTANCE;
+ }
+
+ // writer partitioner
+ if (partitionColumns.length > 0) {
+ IBinaryComparator[] comparators = new IBinaryComparator[partitionComparatorFactories.length];
+ for (int i = 0; i < partitionComparatorFactories.length; i++) {
+ comparators[i] = partitionComparatorFactories[i].createBinaryComparator();
+ }
+
+ return new WriterPartitioner(partitionColumns, comparators);
+ }
+
+ // no-op partitioner
+ return new NoOpWriterPartitioner();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 02327e2..7cb107d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -160,6 +160,8 @@
FRAME_BIGGER_THAN_SORT_MEMORY(130),
RESULT_DOES_NOT_FOLLOW_SCHEMA(131),
EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132),
+ EMPTY_TYPE_INFERRED(133),
+ SCHEMA_LIMIT_EXCEEDED(134),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 226234f..e1fbe30 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -150,6 +150,8 @@
130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget.
131 = Result does not follow the schema, %1$s type expected but found %2$s type at '%3$s'
132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the schema
+133 = Schema could not be inferred, empty types found in the result
+134 = Schema Limit exceeded, maximum number of heterogeneous schemas allowed : '%1$s'
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s