diff --git a/asterixdb/asterix-app/data/hdfs/parquet/partition_heterogeneous.json b/asterixdb/asterix-app/data/hdfs/parquet/partition_heterogeneous.json
new file mode 100644
index 0000000..e6179d6
--- /dev/null
+++ b/asterixdb/asterix-app/data/hdfs/parquet/partition_heterogeneous.json
@@ -0,0 +1,100 @@
+{"id": 1, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 2, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 3, "partitioner_key": "C", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 4, "partitioner_key": "C", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 5, "partitioner_key": "A", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 6, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 7, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 8, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 9, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 10, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 11, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 12, "partitioner_key": "B", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 13, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 14, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 15, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 16, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 17, "partitioner_key": "B", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 18, "partitioner_key": "C", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 19, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 20, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 21, "partitioner_key": "B", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 22, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 23, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 24, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 25, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 26, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 27, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 28, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 29, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 30, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 31, "partitioner_key": "C", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 32, "partitioner_key": "B", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 33, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 34, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 35, "partitioner_key": "B", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 36, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 37, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 38, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 39, "partitioner_key": "B", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 40, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 41, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 42, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 43, "partitioner_key": "C", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 44, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 45, "partitioner_key": "A", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 46, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 47, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 48, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 49, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 50, "partitioner_key": "C", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 51, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 52, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 53, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 54, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 55, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 56, "partitioner_key": "B", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 57, "partitioner_key": "B", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 58, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 59, "partitioner_key": "A", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 60, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 61, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 62, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 63, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 64, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 65, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 66, "partitioner_key": "B", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 67, "partitioner_key": "B", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 68, "partitioner_key": "C", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 69, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 70, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 71, "partitioner_key": "A", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 72, "partitioner_key": "B", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 73, "partitioner_key": "C", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 74, "partitioner_key": "C", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 75, "partitioner_key": "A", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 76, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 77, "partitioner_key": "A", "name": false, "randomField": [1, 2, 3], "active": {"status": "active"}, "price": "one hundred"}
+{"id": 78, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 79, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 80, "partitioner_key": "A", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 81, "partitioner_key": "C", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 82, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 83, "partitioner_key": "A", "name": "None", "randomField": "5678", "active": false, "price": {"currency": "INR", "value": 99.99}}
+{"id": 84, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 85, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 86, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 87, "partitioner_key": "C", "name": ["Kohli", "Dhoni"], "randomField": 18, "active": 1, "price": {"amount": 99.99, "currency": "USD"}}
+{"id": 88, "partitioner_key": "A", "name": {"nickname": "VK"}, "randomField": 5678, "active": [true, false], "price": 100}
+{"id": 89, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 90, "partitioner_key": "C", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 91, "partitioner_key": "A", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 92, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 93, "partitioner_key": "B", "name": 3456, "randomField": {"value": 7890}, "active": true, "price": "101.50"}
+{"id": 94, "partitioner_key": "A", "name": {"first": "MS", "last": "Dhoni"}, "randomField": [9, 8, 7], "active": "no", "price": 101.5}
+{"id": 95, "partitioner_key": "B", "name": ["Dhoni", "Kohli"], "randomField": "None", "active": 0, "price": 50}
+{"id": 96, "partitioner_key": "B", "name": "Virat Kohli", "randomField": 1234, "active": true, "price": 99.99}
+{"id": 97, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 98, "partitioner_key": "B", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
+{"id": 99, "partitioner_key": "A", "name": 9876, "randomField": {"identifier": "ORD1234", "type": "order"}, "active": "active", "price": [99, 100, 101]}
+{"id": 100, "partitioner_key": "C", "name": {"first": "Virat", "last": "Kohli"}, "randomField": "1234", "active": "yes", "price": "99.99"}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 555b6b9..17cf78a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4118,19 +4118,16 @@
                         ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
                         metadataProvider));
 
-                if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
-                        .equalsIgnoreCase(ExternalDataConstants.FORMAT_PARQUET)) {
-                    if (copyTo.getType() == null) {
-                        throw new CompilationException(ErrorCode.COMPILATION_ERROR,
-                                "TYPE() Expression is required for parquet format");
+                if (ExternalDataConstants.FORMAT_PARQUET
+                        .equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
+                    if (copyTo.getType() != null) {
+                        DataverseName dataverseName =
+                                DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
+                        IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+                                ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
+                        edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
+                                SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
                     }
-
-                    DataverseName dataverseName =
-                            DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
-                    IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
-                            ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
-                    edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
-                            SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
                 }
 
                 if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp
index 36d00be..0bfc4eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.01.ddl.sqlpp
@@ -23,7 +23,7 @@
 
 
 CREATE TYPE ColumnType1 AS {
-  id: integer
+  id: int
 };
 
 CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
index ce68cbd..8eb14c0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
@@ -20,7 +20,7 @@
 USE test;
 
 
-insert into TestCollection({"id":1, "name": "John", "nested" : { "first" : "john" , "second":"JOHN" }  });
+insert into TestCollection({"id":2, "name": "John", "nested" : { "first" : "john" , "second":"JOHN" }  });
 
 COPY (
    select c.* from TestCollection c
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
index 281610b..f5d7eda 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
@@ -24,14 +24,15 @@
 ) toWriter
 TO S3
 PATH ("copy-to-result", "parquet-error-checks13")
-TYPE ( { id :  int,  name : string, list : [int,string]} )
+TYPE ( { id :  int,  name : string, list : [string]} )
 WITH {
     "accessKeyId":"dummyAccessKey",
     "secretAccessKey":"dummySecretKey",
     "region":"us-west-2",
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
-    "format":"parquet"
+    "format":"parquet",
+    "max-schemas" : "yvghc"
 }
 
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
index 1d1c6af..7628044 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
@@ -24,14 +24,15 @@
 ) toWriter
 TO S3
 PATH ("copy-to-result", "parquet-error-checks14")
-TYPE ( {id :  int, name : string, list : [int |} )
+TYPE ( {id :  int, name : string, l : int } )
 WITH {
     "accessKeyId":"dummyAccessKey",
     "secretAccessKey":"dummySecretKey",
     "region":"us-west-2",
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
-    "format":"parquet"
+    "format":"parquet",
+    "max-schemas" : "15"
 }
 
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
index a26624a..89bfb2a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
@@ -19,12 +19,20 @@
 
 USE test;
 
+
+
+
+
+
+insert into TestCollection({"id":1, "name": []});
+
+
+
 COPY (
-   select c.* from TestCollection c
-) toWriter
+select c.* from TestCollection c
+    ) toWriter
 TO S3
 PATH ("copy-to-result", "parquet-error-checks15")
-TYPE ( {id : int, name : string, list : [int] )
 WITH {
     "accessKeyId":"dummyAccessKey",
     "secretAccessKey":"dummySecretKey",
@@ -32,7 +40,5 @@
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
     "format":"parquet"
-}
-
-
+    }
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.16.update.sqlpp
similarity index 64%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.16.update.sqlpp
index b6ec758..ad4fc73 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.16.update.sqlpp
@@ -19,19 +19,34 @@
 
 USE test;
 
+
+
+
+insert into TestCollection({"id":10, "name": 1 });
+insert into TestCollection({"id":12, "name": [1] });
+insert into TestCollection({"id":15, "name": [[1]] });
+insert into TestCollection({"id":16, "name": [[[1]]] });
+insert into TestCollection({"id":17, "name": [[[[1]]]] });
+insert into TestCollection({"id":18, "name": [[[[[1]]]]] });
+insert into TestCollection({"id":19, "name": [[[[[[1]]]]]] });
+
+
+
 COPY (
-   select c.* from TestCollection c
-) toWriter
+select id,name from TestCollection c
+    ) toWriter
 TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks16")
 WITH {
     "accessKeyId":"dummyAccessKey",
     "secretAccessKey":"dummySecretKey",
     "region":"us-west-2",
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
-    "format":"parquet"
-}
+    "format":"parquet",
+    "max-schemas" : "2"
+    }
+
 
 
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.01.ddl.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.01.ddl.sqlpp
index b6ec758..00d5c49 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.01.ddl.sqlpp
@@ -17,21 +17,13 @@
  * under the License.
  */
 
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
+CREATE TYPE ColumnType1 AS {
+ id: int
+};
 
 
-
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.02.update.sqlpp
new file mode 100644
index 0000000..308d3b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.02.update.sqlpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+
+-- INSERT INTO TestCollection( {"id":2,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":5,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":8,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":10,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":12,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":15,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":17,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":20,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":21,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":27,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":28,"name":{"first":"virat"}});
+
+
+insert into TestCollection({"id":2});
+insert into TestCollection({"id":5,"name":"virat"});
+insert into TestCollection({"id":8,"name":{"first":"virat"}});
+insert into TestCollection({"id":10,"name":{"first":"virat"},"age":18});
+insert into TestCollection({"id":12,"name":123});
+insert into TestCollection({"id":15,"name":[123,456]});
+insert into TestCollection({"id":17,"name":765});
+insert into TestCollection({"id":20,"name":[789]});
+insert into TestCollection({"id":21,"name":[{"first":"virat"}]});
+insert into TestCollection({"id":27,"name":[{"first":"virat","second":"kohli"}]});
+insert into TestCollection({"id":28,"name":{"first":"virat"}});
+
+
+
+-- INSERT INTO TestCollection( {"id":1,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":2,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":3,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":4,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":5,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":6,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":7,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":8,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":9,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":10,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":11,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":12,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":13,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":14,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":15,"name":{"first":"virat"}});
+-- INSERT INTO TestCollection( {"id":16,"name":{"first":"virat"}});
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.03.update.sqlpp
similarity index 94%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.03.update.sqlpp
index b6ec758..a7e6e16 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.03.update.sqlpp
@@ -23,7 +23,7 @@
    select c.* from TestCollection c
 ) toWriter
 TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-heterogeneous")
 WITH {
     "accessKeyId":"dummyAccessKey",
     "secretAccessKey":"dummySecretKey",
@@ -31,7 +31,4 @@
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
     "format":"parquet"
-}
-
-
-
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.04.ddl.sqlpp
similarity index 69%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.04.ddl.sqlpp
index b6ec758..95f75c6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.04.ddl.sqlpp
@@ -19,19 +19,19 @@
 
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
+
+CREATE TYPE ColumnType2 AS {
+};
 
 
 
+CREATE EXTERNAL DATASET TestDataset(ColumnType2) USING S3
+(
+  ("serviceEndpoint"="http://127.0.0.1:8001"),
+  ("region"="us-west-2"),
+  ("container"="playground"),
+  ("definition"="copy-to-result/parquet-heterogeneous/"),
+  ("include"="*.parquet"),
+  ("requireVersionChangeDetection"="false"),
+  ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.query.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.query.sqlpp
index b6ec758..3d26c18 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.query.sqlpp
@@ -19,19 +19,7 @@
 
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
 
-
-
+SELECT c.*
+FROM TestDataset c
+ORDER BY c.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.01.ddl.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.01.ddl.sqlpp
index b6ec758..2155b4e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.01.ddl.sqlpp
@@ -17,21 +17,19 @@
  * under the License.
  */
 
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
+CREATE TYPE ColumnType1 AS {
+    id: int
+    };
+
+CREATE DATASET ParitionParquetDataset(ColumnType1)
+PRIMARY KEY id WITH {
+    "storage-format": {"format" : "column"}
+};
 
 
-
+CREATE TYPE ColumnType2 AS {
+    };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.02.update.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.02.update.sqlpp
index b6ec758..4de223e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.02.update.sqlpp
@@ -19,19 +19,8 @@
 
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
-
-
-
+LOAD DATASET ParitionParquetDataset USING localfs
+(
+    ("path" = "asterix_nc1://data/hdfs/parquet/partition_heterogeneous.json"),
+    ("format" = "json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.03.update.sqlpp
similarity index 78%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.03.update.sqlpp
index b6ec758..948ba17 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.03.update.sqlpp
@@ -20,18 +20,20 @@
 USE test;
 
 COPY (
-   select c.* from TestCollection c
-) toWriter
+SELECT t.* FROM ParitionParquetDataset t
+    ) toWriter
 TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-partition-heterogeneous" , partitioner_key , "random" )
+OVER ( PARTITION BY toWriter.partitioner_key AS partitioner_key )
 WITH {
     "accessKeyId":"dummyAccessKey",
     "secretAccessKey":"dummySecretKey",
     "region":"us-west-2",
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
-    "format":"parquet"
-}
+    "format":"parquet",
+    "max-schemas" : "10"
+    };
 
 
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.04.ddl.sqlpp
similarity index 69%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.04.ddl.sqlpp
index b6ec758..c626270 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.04.ddl.sqlpp
@@ -19,19 +19,16 @@
 
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
 
 
 
+CREATE EXTERNAL DATASET ParitionParquetDatasetCopy(ColumnType2) USING S3
+(
+  ("serviceEndpoint"="http://127.0.0.1:8001"),
+  ("region"="us-west-2"),
+  ("container"="playground"),
+  ("definition"="copy-to-result/parquet-partition-heterogeneous/"),
+  ("include"="*.parquet"),
+  ("requireVersionChangeDetection"="false"),
+  ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.query.sqlpp
similarity index 71%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.query.sqlpp
index b6ec758..7009f98 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.query.sqlpp
@@ -19,19 +19,9 @@
 
 USE test;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
-}
 
+SELECT c.*
+FROM ParitionParquetDatasetCopy c
+ORDER BY c.id;
 
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.adm
new file mode 100644
index 0000000..4ecacdb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-heterogeneous/parquet-heterogeneous.05.adm
@@ -0,0 +1,11 @@
+{ "id": 2 }
+{ "name": "virat", "id": 5 }
+{ "name": { "first": "virat" }, "id": 8 }
+{ "name": { "first": "virat" }, "id": 10, "age": 18 }
+{ "name": 123, "id": 12 }
+{ "name": [ 123, 456 ], "id": 15 }
+{ "name": 765, "id": 17 }
+{ "name": [ 789 ], "id": 20 }
+{ "name": [ { "first": "virat" } ], "id": 21 }
+{ "name": [ { "first": "virat", "second": "kohli" } ], "id": 27 }
+{ "name": { "first": "virat" }, "id": 28 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.adm
new file mode 100644
index 0000000..6b73640
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-partition-heterogeneous/parquet-partition-heterogeneous.05.adm
@@ -0,0 +1,100 @@
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 1, "randomField": 5678 }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 2, "randomField": 18 }
+{ "partitioner_key": "C", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 3, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 4, "randomField": 5678 }
+{ "partitioner_key": "A", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 5, "randomField": "1234" }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 6, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 7, "randomField": 18 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 8, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 9, "randomField": 18 }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 10, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 11, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 12, "randomField": "5678" }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 13, "randomField": "None" }
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 14, "randomField": 5678 }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 15, "randomField": 18 }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 16, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 17, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "C", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 18, "randomField": 5678 }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 19, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 20, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 21, "randomField": "1234" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 22, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 23, "randomField": 18 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 24, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 25, "randomField": "None" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 26, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 27, "randomField": "5678" }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 28, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 29, "randomField": "1234" }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 30, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "C", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 31, "randomField": "5678" }
+{ "partitioner_key": "B", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 32, "randomField": 5678 }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 33, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 34, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 35, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 36, "randomField": "None" }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 37, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 38, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 39, "randomField": 5678 }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 40, "randomField": "1234" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 41, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 42, "randomField": "5678" }
+{ "partitioner_key": "C", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 43, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 44, "randomField": 18 }
+{ "partitioner_key": "A", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 45, "randomField": 1234 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 46, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 47, "randomField": "None" }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 48, "randomField": "None" }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 49, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 50, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 51, "randomField": "None" }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 52, "randomField": "None" }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 53, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 54, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 55, "randomField": "5678" }
+{ "partitioner_key": "B", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 56, "randomField": 1234 }
+{ "partitioner_key": "B", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 57, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 58, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 59, "randomField": "None" }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 60, "randomField": 18 }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 61, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 62, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 63, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 64, "randomField": 18 }
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 65, "randomField": 5678 }
+{ "partitioner_key": "B", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 66, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 67, "randomField": "5678" }
+{ "partitioner_key": "C", "price": "101.50", "name": 3456, "active": true, "id": 68, "randomField": { "value": 7890 } }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 69, "randomField": "None" }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 70, "randomField": 18 }
+{ "partitioner_key": "A", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 71, "randomField": 18 }
+{ "partitioner_key": "B", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 72, "randomField": 18 }
+{ "partitioner_key": "C", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 73, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "C", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 74, "randomField": 1234 }
+{ "partitioner_key": "A", "price": "101.50", "name": 3456, "active": true, "id": 75, "randomField": { "value": 7890 } }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 76, "randomField": "None" }
+{ "partitioner_key": "A", "price": "one hundred", "name": false, "active": { "status": "active" }, "id": 77, "randomField": [ 1, 2, 3 ] }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 78, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 79, "randomField": "1234" }
+{ "partitioner_key": "A", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 80, "randomField": "1234" }
+{ "partitioner_key": "C", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 81, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 82, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "A", "price": { "currency": "INR", "value": 99.99 }, "name": "None", "active": false, "id": 83, "randomField": "5678" }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 84, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 85, "randomField": "None" }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 86, "randomField": "1234" }
+{ "partitioner_key": "C", "price": { "amount": 99.99, "currency": "USD" }, "name": [ "Kohli", "Dhoni" ], "active": 1, "id": 87, "randomField": 18 }
+{ "partitioner_key": "A", "price": 100, "name": { "nickname": "VK" }, "active": [ true, false ], "id": 88, "randomField": 5678 }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 89, "randomField": "1234" }
+{ "partitioner_key": "C", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 90, "randomField": "None" }
+{ "partitioner_key": "A", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 91, "randomField": 1234 }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 92, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": "101.50", "name": 3456, "active": true, "id": 93, "randomField": { "value": 7890 } }
+{ "partitioner_key": "A", "price": 101.5, "name": { "last": "Dhoni", "first": "MS" }, "active": "no", "id": 94, "randomField": [ 9, 8, 7 ] }
+{ "partitioner_key": "B", "price": 50, "name": [ "Dhoni", "Kohli" ], "active": 0, "id": 95, "randomField": "None" }
+{ "partitioner_key": "B", "price": 99.99, "name": "Virat Kohli", "active": true, "id": 96, "randomField": 1234 }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 97, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "B", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 98, "randomField": "1234" }
+{ "partitioner_key": "A", "price": [ 99, 100, 101 ], "name": 9876, "active": "active", "id": 99, "randomField": { "identifier": "ORD1234", "type": "order" } }
+{ "partitioner_key": "C", "price": "99.99", "name": { "last": "Kohli", "first": "Virat" }, "active": "yes", "id": 100, "randomField": "1234" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 80628ab..ecbd4b9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -50,11 +50,21 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="copy-to">
+      <compilation-unit name="parquet-partition-heterogeneous">
+        <output-dir compare="Text">parquet-partition-heterogeneous</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="copy-to">
       <compilation-unit name="parquet-utf8">
         <output-dir compare="Text">parquet-utf8</output-dir>
       </compilation-unit>
     </test-case>
     <test-case FilePath="copy-to">
+      <compilation-unit name="parquet-heterogeneous">
+        <output-dir compare="Text">parquet-heterogeneous</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="copy-to">
       <compilation-unit name="parquet-cover-data-types">
         <output-dir compare="Text">parquet-cover-data-types</output-dir>
       </compilation-unit>
@@ -112,8 +122,7 @@
     <test-case FilePath="copy-to/negative">
       <compilation-unit name="parquet-error-checks">
         <output-dir compare="Text">parquet-error-checks</output-dir>
-        <expected-error>ASX1079: Compilation error: TYPE() Expression is required for parquet format</expected-error>
-        <expected-error>ASX0037: Type mismatch: expected value of type integer, but got the value of type BINARY</expected-error>
+        <expected-error>ASX0037: Type mismatch: expected value of type BINARY, but got the value of type bigint</expected-error>
         <expected-error>HYR0132: Extra field in the result, field 'second' does not exist at 'nested' in the schema</expected-error>
         <expected-error>HYR0131: Result does not follow the schema, group type expected but found primitive type at 'nested'</expected-error>
         <expected-error>HYR0131: Result does not follow the schema, primitive type expected but found group type at 'name'</expected-error>
@@ -123,9 +132,10 @@
         <expected-error>ASX1001: Syntax error</expected-error>
         <expected-error>ASX1204: 'binary' type not supported in parquet format</expected-error>
         <expected-error>ASX1205: Invalid Parquet Writer Version provided '3'. Supported values: [1, 2]</expected-error>
-        <expected-error>ASX1001: Syntax error</expected-error>
-        <expected-error>ASX1001: Syntax error</expected-error>
-        <expected-error>ASX1001: Syntax error</expected-error>
+        <expected-error>ASX0039: Expected integer value, got yvghc (in line 22, at column 6)</expected-error>
+        <expected-error>ASX1209: Maximum value allowed for 'max-schemas' is 10. Found 15</expected-error>
+        <expected-error>HYR0133: Schema could not be inferred, empty types found in the result</expected-error>
+        <expected-error>HYR0134: Schema Limit exceeded, maximum number of heterogeneous schemas allowed : '2'</expected-error>
         </compilation-unit>
     </test-case>
     <test-case FilePath="copy-to/negative">
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
new file mode 100644
index 0000000..d754068
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaLazyVisitor.generateSchema;
+
+import java.io.Serializable;
+
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactory;
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaTree;
+import org.apache.asterix.runtime.writer.ExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IPathResolver;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetExternalWriterFactory implements Serializable {
+
+    private static final long serialVersionUID = 8971234908711236L;
+    private final IExternalFileWriterFactory writerFactory;
+    private final int maxResult;
+    private final ParquetExternalFilePrinterFactory printerFactory;
+    private final IPathResolver resolver;
+    private final IHyracksTaskContext ctx;
+
+    public ParquetExternalWriterFactory(IHyracksTaskContext ctx, IExternalFileWriterFactory writerFactory,
+            int maxResult, ParquetExternalFilePrinterFactory printerFactory, IPathResolver resolver) {
+        this.ctx = ctx;
+        this.writerFactory = writerFactory;
+        this.maxResult = maxResult;
+        this.printerFactory = printerFactory;
+        this.resolver = resolver;
+    }
+
+    public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+        MessageType schema = generateSchema(schemaNode);
+        printerFactory.setParquetSchemaString(schema.toString());
+        IExternalFileWriter writer = writerFactory.createWriter(ctx, printerFactory);
+        return new ExternalFileWriter(resolver, writer, maxResult);
+    }
+
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
new file mode 100644
index 0000000..b500cbe
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.writer.printer.parquet.ISchemaChecker;
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaLazyVisitor;
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaTree;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ParquetSchemaInferPoolWriter {
+    private final ParquetExternalWriterFactory writerFactory;
+
+    private List<ParquetSchemaTree.SchemaNode> schemaNodes;
+    private List<IExternalWriter> writerList;
+    private final int maxSchemas;
+    private ISchemaChecker schemaChecker;
+    private ParquetSchemaLazyVisitor schemaLazyVisitor;
+
+    public ParquetSchemaInferPoolWriter(ParquetExternalWriterFactory writerFactory, ISchemaChecker schemaChecker,
+            ParquetSchemaLazyVisitor parquetSchemaLazyVisitor, int maxSchemas) {
+        this.writerFactory = writerFactory;
+        this.schemaChecker = schemaChecker;
+        this.schemaLazyVisitor = parquetSchemaLazyVisitor;
+        this.maxSchemas = maxSchemas;
+        this.schemaNodes = new ArrayList<>();
+        this.writerList = new ArrayList<>();
+    }
+
+    public void inferSchema(IValueReference value) throws HyracksDataException {
+        for (int i = 0; i < schemaNodes.size(); i++) {
+            ISchemaChecker.SchemaComparisonType schemaComparisonType =
+                    schemaChecker.checkSchema(schemaNodes.get(i), value);
+
+            if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) {
+                return;
+            } else if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.GROWING)) {
+                schemaNodes.set(i, schemaLazyVisitor.inferSchema(value));
+                closeWriter(i);
+                return;
+            }
+        }
+
+        if (schemaNodes.size() == maxSchemas) {
+            throw new HyracksDataException(ErrorCode.SCHEMA_LIMIT_EXCEEDED, maxSchemas);
+        }
+        schemaNodes.add(schemaLazyVisitor.inferSchema(value));
+        writerList.add(null);
+    }
+
+    public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
+        for (int i = 0; i < writerList.size(); i++) {
+            createOrInitPartition(i, tuple);
+        }
+    }
+
+    public void write(IValueReference value) throws HyracksDataException {
+        for (int i = 0; i < schemaNodes.size(); i++) {
+            if (schemaChecker.checkSchema(schemaNodes.get(i), value)
+                    .equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) {
+                createOrWrite(i, value);
+                return;
+            }
+        }
+    }
+
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < writerList.size(); i++) {
+            closeWriter(i);
+        }
+    }
+
+    private void createOrInitPartition(int index, IFrameTupleReference tupleReference) throws HyracksDataException {
+        if (writerList.get(index) == null) {
+            createWriter(index);
+        }
+        writerList.get(index).initNewPartition(tupleReference);
+    }
+
+    private void createOrWrite(int index, IValueReference value) throws HyracksDataException {
+        if (writerList.get(index) == null) {
+            createWriter(index);
+        }
+        writerList.get(index).write(value);
+    }
+
+    private void createWriter(int index) throws HyracksDataException {
+        writerList.set(index, writerFactory.createWriter(schemaNodes.get(index)));
+        writerList.get(index).open();
+    }
+
+    private void closeWriter(int index) throws HyracksDataException {
+        if (writerList.get(index) != null) {
+            writerList.get(index).close();
+            writerList.set(index, null);
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterFactory.java
new file mode 100644
index 0000000..3a276a2
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactory;
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactoryProvider;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IPathResolverFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.WriterPartitionerFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ParquetSinkExternalWriterFactory extends AbstractPushRuntimeFactory {
+    private static final long serialVersionUID = -1285997525553685225L;
+    private final WriterPartitionerFactory partitionerFactory;
+    private final RecordDescriptor inputRecordDesc;
+    private final int sourceColumn;
+    private final int maxSchemas;
+    private final IAType sourceType;
+    private final IExternalFileWriterFactory writerFactory;
+    private final int maxResult;
+    private final ParquetExternalFilePrinterFactoryProvider printerFactoryProvider;
+    private final IPathResolverFactory pathResolverFactory;
+
+    public ParquetSinkExternalWriterFactory(WriterPartitionerFactory partitionerFactory,
+            RecordDescriptor inputRecordDesc, int sourceColumn, IAType sourceType, int maxSchemas,
+            IExternalFileWriterFactory writerFactory, int maxResult,
+            ParquetExternalFilePrinterFactoryProvider printerFactoryProvider,
+            IPathResolverFactory pathResolverFactory) {
+        this.partitionerFactory = partitionerFactory;
+        this.inputRecordDesc = inputRecordDesc;
+        this.sourceColumn = sourceColumn;
+        this.sourceType = sourceType;
+        this.maxSchemas = maxSchemas;
+        this.writerFactory = writerFactory;
+        this.maxResult = maxResult;
+        this.printerFactoryProvider = printerFactoryProvider;
+        this.pathResolverFactory = pathResolverFactory;
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        ParquetExternalFilePrinterFactory printerFactory =
+                (ParquetExternalFilePrinterFactory) printerFactoryProvider.createPrinterFactory();
+        ParquetExternalWriterFactory parquetExternalWriterFactory = new ParquetExternalWriterFactory(ctx, writerFactory,
+                maxResult, printerFactory, pathResolverFactory.createResolver(ctx));
+        ParquetSinkExternalWriterRuntime runtime =
+                new ParquetSinkExternalWriterRuntime(sourceColumn, partitionerFactory.createPartitioner(),
+                        inputRecordDesc, parquetExternalWriterFactory, sourceType, maxSchemas);
+        return new IPushRuntime[] { runtime };
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
new file mode 100644
index 0000000..3dbd4d3
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.parquet;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.writer.printer.parquet.ParquetSchemaLazyVisitor;
+import org.apache.asterix.external.writer.printer.parquet.SchemaCheckerLazyVisitor;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.writer.IWriterPartitioner;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ParquetSinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+    private final int sourceColumn;
+    private final IWriterPartitioner partitioner;
+    private final IPointable sourceValue;
+    private final ParquetExternalWriterFactory writerFactory;
+    private FrameTupleAccessor tupleAccessor;
+    private FrameTupleReference tupleRef;
+    private IFrameWriter frameWriter;
+    private final int maxSchemas;
+    private final IAType sourceType;
+    private ParquetSchemaInferPoolWriter poolWriter;
+
+    public ParquetSinkExternalWriterRuntime(int sourceColumn, IWriterPartitioner partitioner,
+            RecordDescriptor inputRecordDesc, ParquetExternalWriterFactory writerFactory, IAType sourceType,
+            int maxSchemas) {
+        this.sourceColumn = sourceColumn;
+        this.partitioner = partitioner;
+        this.sourceValue = new VoidPointable();
+        this.inputRecordDesc = inputRecordDesc;
+        this.writerFactory = writerFactory;
+        this.sourceType = sourceType;
+        this.maxSchemas = maxSchemas;
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter frameWriter, RecordDescriptor recordDesc) {
+        this.frameWriter = frameWriter;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (tupleAccessor == null) {
+            tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
+            tupleRef = new FrameTupleReference();
+        }
+
+        poolWriter = new ParquetSchemaInferPoolWriter(writerFactory, new SchemaCheckerLazyVisitor(sourceType),
+                new ParquetSchemaLazyVisitor(sourceType), maxSchemas);
+        this.frameWriter.open();
+
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tupleAccessor.reset(buffer);
+
+        for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+            tupleRef.reset(tupleAccessor, i);
+            setValue(tupleRef, sourceColumn, sourceValue);
+            poolWriter.inferSchema(sourceValue);
+        }
+
+        for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+            tupleRef.reset(tupleAccessor, i);
+            setValue(tupleRef, sourceColumn, sourceValue);
+            if (partitioner.isNewPartition(tupleAccessor, i)) {
+                poolWriter.initNewPartition(tupleRef);
+            }
+            poolWriter.write(sourceValue);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        frameWriter.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        poolWriter.close();
+        frameWriter.close();
+    }
+
+    private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
+        byte[] data = tuple.getFieldData(column);
+        int start = tuple.getFieldStart(column);
+        int length = tuple.getFieldLength(column);
+        value.set(data, start, length);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 550015d..b955fa2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -311,6 +311,8 @@
     ILLEGAL_SIZE_PROVIDED(1206),
     TYPE_UNSUPPORTED_CSV_WRITE(1207),
     INVALID_CSV_SCHEMA(1208),
+    MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
+
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
     UTIL_DATAFLOW_UTILS_TUPLE_TOO_LARGE(3002),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 2350c2c..e450657 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -313,6 +313,7 @@
 1206 = Storage units expected for the field '%1$s' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided '%2$s'
 1207 = '%1$s' type not supported in csv format
 1208 = Invalid Copy to CSV schema
+1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 18d5158..1329a29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -351,6 +351,9 @@
             List.of(ATypeTag.TINYINT, ATypeTag.SMALLINT, ATypeTag.INTEGER, ATypeTag.BIGINT, ATypeTag.UINT8,
                     ATypeTag.UINT16, ATypeTag.UINT64, ATypeTag.FLOAT, ATypeTag.DOUBLE, ATypeTag.STRING,
                     ATypeTag.BOOLEAN, ATypeTag.DATETIME, ATypeTag.UINT32, ATypeTag.DATE, ATypeTag.TIME);
+    public static final String PARQUET_MAX_SCHEMAS_KEY = "max-schemas";
+    public static final int PARQUET_MAX_SCHEMAS_DEFAULT_VALUE = 5;
+    public static final int PARQUET_MAX_SCHEMAS_MAX_VALUE = 10;
 
     static {
         WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, FORMAT_PARQUET, FORMAT_CSV_LOWER_CASE);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 3de4067..380a9a8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.util;
 
 import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static org.apache.asterix.common.exceptions.ErrorCode.MAXIMUM_VALUE_ALLOWED_FOR_PARAM;
 import static org.apache.asterix.common.exceptions.ErrorCode.MINIMUM_VALUE_ALLOWED_FOR_PARAM;
 import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
 import static org.apache.asterix.external.util.ExternalDataConstants.FORMAT_CSV;
@@ -27,6 +28,8 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_PAGE_SIZE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_ROW_GROUP_SIZE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_WRITER_MAX_RESULT;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_MAX_SCHEMAS_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_MAX_SCHEMAS_MAX_VALUE;
 import static org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_KEY;
 import static org.apache.asterix.external.util.ExternalDataConstants.WRITER_MAX_RESULT_MINIMUM;
 
@@ -91,6 +94,7 @@
         validateParquetRowGroupSize(configuration);
         validateParquetPageSize(configuration);
         validateVersion(configuration, sourceLocation);
+        validateMaxParquetSchemas(configuration, sourceLocation);
     }
 
     private static void validateVersion(Map<String, String> configuration, SourceLocation sourceLocation)
@@ -175,6 +179,24 @@
         }
     }
 
+    private static void validateMaxParquetSchemas(Map<String, String> configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String maxResult = configuration.get(PARQUET_MAX_SCHEMAS_KEY);
+        if (maxResult == null) {
+            return;
+        }
+
+        try {
+            int value = Integer.parseInt(maxResult);
+            if (value > PARQUET_MAX_SCHEMAS_MAX_VALUE) {
+                throw new CompilationException(MAXIMUM_VALUE_ALLOWED_FOR_PARAM, PARQUET_MAX_SCHEMAS_KEY,
+                        PARQUET_MAX_SCHEMAS_MAX_VALUE, value);
+            }
+        } catch (NumberFormatException e) {
+            throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, maxResult);
+        }
+    }
+
     private static void checkSupported(String paramKey, String value, Set<String> supportedSet, ErrorCode errorCode,
             SourceLocation sourceLocation, boolean optional) throws CompilationException {
         if (optional && value == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index 5ccd2fe..b6ad34e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -25,8 +25,8 @@
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactory {
-    private static final long serialVersionUID = 8971234908711234L;
-    private final String parquetSchemaString;
+    private static final long serialVersionUID = 8971234908711235L;
+    private String parquetSchemaString;
     private final IAType typeInfo;
     private final CompressionCodecName compressionCodecName;
     private final long rowGroupSize;
@@ -43,6 +43,19 @@
         this.writerVersion = writerVersion;
     }
 
+    public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, IAType typeInfo,
+            long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
+        this.compressionCodecName = compressionCodecName;
+        this.typeInfo = typeInfo;
+        this.rowGroupSize = rowGroupSize;
+        this.pageSize = pageSize;
+        this.writerVersion = writerVersion;
+    }
+
+    public void setParquetSchemaString(String parquetSchemaString) {
+        this.parquetSchemaString = parquetSchemaString;
+    }
+
     @Override
     public IExternalPrinter createPrinter() {
         return new ParquetExternalFilePrinter(compressionCodecName, parquetSchemaString, typeInfo, rowGroupSize,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactoryProvider.java
new file mode 100644
index 0000000..bd70853
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactoryProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.Serializable;
+
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class ParquetExternalFilePrinterFactoryProvider implements Serializable {
+    private static final long serialVersionUID = 8971234908711237L;
+    private final IAType typeInfo;
+    private final CompressionCodecName compressionCodecName;
+    private final long rowGroupSize;
+    private final int pageSize;
+    private final ParquetProperties.WriterVersion writerVersion;
+
+    public ParquetExternalFilePrinterFactoryProvider(CompressionCodecName compressionCodecName, IAType typeInfo,
+            long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) {
+        this.compressionCodecName = compressionCodecName;
+        this.typeInfo = typeInfo;
+        this.rowGroupSize = rowGroupSize;
+        this.pageSize = pageSize;
+        this.writerVersion = writerVersion;
+    }
+
+    public IExternalPrinterFactory createPrinterFactory() {
+        return new ParquetExternalFilePrinterFactory(compressionCodecName, typeInfo, rowGroupSize, pageSize,
+                writerVersion);
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
new file mode 100644
index 0000000..99b9736
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public interface ISchemaChecker {
+    enum SchemaComparisonType {
+        EQUIVALENT,
+        GROWING,
+        CONFLICTING
+    }
+
+    static SchemaComparisonType max(ISchemaChecker.SchemaComparisonType a, ISchemaChecker.SchemaComparisonType b) {
+        return a.compareTo(b) > 0 ? a : b;
+    }
+
+    SchemaComparisonType checkSchema(ParquetSchemaTree.SchemaNode schemaNode, IValueReference iValueReference)
+            throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
index f6e85ef..373bfe4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
@@ -43,16 +43,14 @@
 public class ParquetRecordLazyVisitor implements ILazyVisitablePointableVisitor<Void, Type> {
 
     private final MessageType schema;
-    private final IAType typeInfo;
     private final RecordLazyVisitablePointable rec;
     private RecordConsumer recordConsumer;
-    private FieldNamesDictionary fieldNamesDictionary;
+    private final FieldNamesDictionary fieldNamesDictionary;
 
     private final ParquetValueWriter parquetValueWriter;
 
     public ParquetRecordLazyVisitor(MessageType schema, IAType typeInfo) {
         this.schema = schema;
-        this.typeInfo = typeInfo;
         if (typeInfo.getTypeTag() == ATypeTag.OBJECT) {
             this.rec = new TypedRecordLazyVisitablePointable((ARecordType) typeInfo);
         } else if (typeInfo.getTypeTag() == ATypeTag.ANY) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
new file mode 100644
index 0000000..da0cef0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class ParquetSchemaBuilderUtils {
+
+    public static Types.BaseGroupBuilder<?, ?> getGroupChild(Types.Builder parent) {
+        if (parent instanceof Types.BaseGroupBuilder) {
+            return ((Types.BaseGroupBuilder<?, ?>) parent).optionalGroup();
+        } else if (parent instanceof Types.BaseListBuilder) {
+            return ((Types.BaseListBuilder<?, ?>) parent).optionalGroupElement();
+        } else {
+            return null;
+        }
+    }
+
+    public static Types.BaseListBuilder<?, ?> getListChild(Types.Builder parent) {
+        if (parent instanceof Types.BaseGroupBuilder) {
+            return ((Types.BaseGroupBuilder<?, ?>) parent).optionalList();
+        } else if (parent instanceof Types.BaseListBuilder) {
+            return ((Types.BaseListBuilder<?, ?>) parent).optionalListElement();
+        } else {
+            return null;
+        }
+    }
+
+    public static Types.Builder<?, ?> getPrimitiveChild(Types.Builder parent, PrimitiveType.PrimitiveTypeName type,
+            LogicalTypeAnnotation annotation) {
+        if (parent instanceof Types.BaseGroupBuilder) {
+            return ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation);
+        } else if (parent instanceof Types.BaseListBuilder) {
+            return ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation);
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
new file mode 100644
index 0000000..055c635
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaTree.buildParquetSchema;
+
+import java.util.Map;
+
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+
+public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> {
+    private final RecordLazyVisitablePointable rec;
+    private final FieldNamesDictionary fieldNamesDictionary;
+    private final static String SCHEMA_NAME = "asterix_schema";
+
+    public ParquetSchemaLazyVisitor(IAType typeInfo) {
+        this.fieldNamesDictionary = new FieldNamesDictionary();
+        if (typeInfo.getTypeTag() == ATypeTag.OBJECT) {
+            this.rec = new TypedRecordLazyVisitablePointable((ARecordType) typeInfo);
+        } else if (typeInfo.getTypeTag() == ATypeTag.ANY) {
+            this.rec = new RecordLazyVisitablePointable(true);
+        } else {
+            throw new RuntimeException("Type Unsupported for parquet printing");
+        }
+    }
+
+    @Override
+    public Void visit(RecordLazyVisitablePointable pointable, ParquetSchemaTree.SchemaNode schemaNode)
+            throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            schemaNode.setType(new ParquetSchemaTree.RecordType());
+        }
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.RecordType)) {
+            throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+        }
+        ParquetSchemaTree.RecordType recordType = (ParquetSchemaTree.RecordType) schemaNode.getType();
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+            String childColumnName = fieldNamesDictionary.getOrCreateFieldNameIndex(pointable.getFieldName());
+            ParquetSchemaTree.SchemaNode childType;
+            if (recordType.getChildren().containsKey(childColumnName)) {
+                childType = recordType.getChildren().get(childColumnName);
+            } else {
+                childType = new ParquetSchemaTree.SchemaNode();
+                recordType.add(childColumnName, childType);
+            }
+            child.accept(this, childType);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visit(AbstractListLazyVisitablePointable pointable, ParquetSchemaTree.SchemaNode schemaNode)
+            throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            schemaNode.setType(new ParquetSchemaTree.ListType());
+        }
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType)) {
+            throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+        }
+        ParquetSchemaTree.ListType listType = (ParquetSchemaTree.ListType) schemaNode.getType();
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+            if (listType.isEmpty()) {
+                listType.setChild(new ParquetSchemaTree.SchemaNode());
+            }
+            child.accept(this, listType.getChild());
+        }
+        return null;
+    }
+
+    @Override
+    public Void visit(FlatLazyVisitablePointable pointable, ParquetSchemaTree.SchemaNode schemaNode)
+            throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            schemaNode.setType(new ParquetSchemaTree.FlatType(
+                    AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP.get(pointable.getTypeTag()),
+                    AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP.get(pointable.getTypeTag())));
+            return null;
+        }
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.FlatType)) {
+            throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+        }
+        ParquetSchemaTree.FlatType flatType = (ParquetSchemaTree.FlatType) schemaNode.getType();
+        if (!(flatType.getPrimitiveTypeName() == AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP
+                .get(pointable.getTypeTag()))
+                || !(flatType.getLogicalTypeAnnotation() == AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP
+                        .get(pointable.getTypeTag()))) {
+            throw new HyracksDataException(ErrorCode.RESULT_DOES_NOT_FOLLOW_SCHEMA);
+        }
+        return null;
+    }
+
+    public ParquetSchemaTree.SchemaNode inferSchema(IValueReference valueReference) throws HyracksDataException {
+        ParquetSchemaTree.SchemaNode schemaNode = new ParquetSchemaTree.SchemaNode();
+        rec.set(valueReference);
+        rec.accept(this, schemaNode);
+        return schemaNode;
+    }
+
+    public static MessageType generateSchema(ParquetSchemaTree.SchemaNode schemaRoot) throws HyracksDataException {
+        Types.MessageTypeBuilder builder = Types.buildMessage();
+        if (schemaRoot.getType() == null)
+            return builder.named(SCHEMA_NAME);
+        for (Map.Entry<String, ParquetSchemaTree.SchemaNode> entry : ((ParquetSchemaTree.RecordType) schemaRoot
+                .getType()).getChildren().entrySet()) {
+            buildParquetSchema(builder, entry.getValue(), entry.getKey());
+        }
+        return builder.named(SCHEMA_NAME);
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
new file mode 100644
index 0000000..ff512df
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaTree.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getGroupChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getListChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getPrimitiveChild;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class ParquetSchemaTree {
+
+    public static class SchemaNode {
+        private AbstractType type;
+
+        public SchemaNode() {
+            type = null;
+        }
+
+        public void setType(AbstractType type) {
+            this.type = type;
+        }
+
+        public AbstractType getType() {
+            return type;
+        }
+    }
+
+    static class RecordType extends AbstractType {
+        private final Map<String, SchemaNode> children;
+
+        public RecordType() {
+            children = new HashMap<>();
+        }
+
+        void add(String fieldName, SchemaNode childType) {
+            children.put(fieldName, childType);
+        }
+
+        SchemaNode get(String fieldName) {
+            return children.get(fieldName);
+        }
+
+        Map<String, SchemaNode> getChildren() {
+            return children;
+        }
+    }
+
+    abstract static class AbstractType {
+    }
+
+    static class FlatType extends AbstractType {
+        private final PrimitiveType.PrimitiveTypeName primitiveTypeName;
+        private final LogicalTypeAnnotation logicalTypeAnnotation;
+
+        public FlatType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
+                LogicalTypeAnnotation logicalTypeAnnotation) {
+            this.primitiveTypeName = primitiveTypeName;
+            this.logicalTypeAnnotation = logicalTypeAnnotation;
+        }
+
+        public LogicalTypeAnnotation getLogicalTypeAnnotation() {
+            return logicalTypeAnnotation;
+        }
+
+        public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() {
+            return primitiveTypeName;
+        }
+    }
+
+    static class ListType extends AbstractType {
+        private SchemaNode child;
+
+        public ListType() {
+            child = null;
+        }
+
+        void setChild(SchemaNode child) {
+            this.child = child;
+        }
+
+        boolean isEmpty() {
+            return child == null;
+        }
+
+        public SchemaNode getChild() {
+            return child;
+        }
+    }
+
+    public static void buildParquetSchema(Types.Builder builder, SchemaNode schemaNode, String columnName)
+            throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
+        }
+        AbstractType typeClass = schemaNode.getType();
+        if (typeClass instanceof RecordType) {
+            buildRecord(builder, (RecordType) schemaNode.getType(), columnName);
+        } else if (typeClass instanceof ListType) {
+            buildList(builder, (ListType) schemaNode.getType(), columnName);
+        } else if (typeClass instanceof FlatType) {
+            buildFlat(builder, (FlatType) schemaNode.getType(), columnName);
+        }
+    }
+
+    private static void buildRecord(Types.Builder builder, RecordType type, String columnName)
+            throws HyracksDataException {
+        Types.BaseGroupBuilder<?, ?> childBuilder = getGroupChild(builder);
+        for (Map.Entry<String, SchemaNode> entry : type.getChildren().entrySet()) {
+            buildParquetSchema(childBuilder, entry.getValue(), entry.getKey());
+        }
+        childBuilder.named(columnName);
+    }
+
+    private static void buildList(Types.Builder builder, ListType type, String columnName) throws HyracksDataException {
+        Types.BaseListBuilder<?, ?> childBuilder = getListChild(builder);
+        SchemaNode child = type.child;
+        if (child == null) {
+            throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
+        }
+        buildParquetSchema(childBuilder, child, columnName);
+    }
+
+    private static void buildFlat(Types.Builder builder, FlatType type, String columnName) throws HyracksDataException {
+        if (type.getPrimitiveTypeName() == null) {
+            // Not sure if this is the right thing to do here
+            throw new HyracksDataException(ErrorCode.EMPTY_TYPE_INFERRED);
+        }
+        getPrimitiveChild(builder, type.getPrimitiveTypeName(), type.getLogicalTypeAnnotation()).named(columnName);
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
index 206a3c9..0390315 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
@@ -74,7 +74,7 @@
                 recordConsumer.addDouble(value);
                 break;
             default:
-                throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+                throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
         }
     }
 
@@ -129,7 +129,7 @@
                         recordConsumer.addDouble(floatValue);
                         break;
                     default:
-                        throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+                        throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
                 }
                 break;
             case DOUBLE:
@@ -148,7 +148,7 @@
                         recordConsumer.addDouble(doubleValue);
                         break;
                     default:
-                        throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+                        throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
                 }
                 break;
             case STRING:
@@ -164,7 +164,7 @@
                             byteArrayOutputStream.getLength()));
 
                 } else {
-                    throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+                    throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
                 }
                 break;
             case BOOLEAN:
@@ -172,7 +172,7 @@
                 if (primitiveTypeName == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
                     recordConsumer.addBoolean(booleanValue);
                 } else {
-                    throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+                    throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
                 }
                 break;
             case DATE:
@@ -190,7 +190,7 @@
             case MISSING:
                 break;
             default:
-                throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, typeTag, primitiveTypeName);
+                throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag);
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
new file mode 100644
index 0000000..44cd5b2
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class SchemaCheckerLazyVisitor implements ISchemaChecker,
+        ILazyVisitablePointableVisitor<ISchemaChecker.SchemaComparisonType, ParquetSchemaTree.SchemaNode> {
+    private final FieldNamesDictionary fieldNamesDictionary;
+    private final RecordLazyVisitablePointable record;
+
+    public SchemaCheckerLazyVisitor(IAType typeInfo) {
+        this.fieldNamesDictionary = new FieldNamesDictionary();
+        if (typeInfo.getTypeTag() == ATypeTag.OBJECT) {
+            this.record = new TypedRecordLazyVisitablePointable((ARecordType) typeInfo);
+        } else if (typeInfo.getTypeTag() == ATypeTag.ANY) {
+            this.record = new RecordLazyVisitablePointable(true);
+        } else {
+            throw new RuntimeException("Type Unsupported for parquet printing");
+        }
+    }
+
+    @Override
+    public ISchemaChecker.SchemaComparisonType visit(RecordLazyVisitablePointable pointable,
+            ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            return ISchemaChecker.SchemaComparisonType.GROWING;
+        }
+
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.RecordType)) {
+            return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+        }
+
+        ParquetSchemaTree.RecordType recordType = (ParquetSchemaTree.RecordType) schemaNode.getType();
+        ISchemaChecker.SchemaComparisonType schemaComparisonType = ISchemaChecker.SchemaComparisonType.EQUIVALENT;
+
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+            String childColumnName = fieldNamesDictionary.getOrCreateFieldNameIndex(pointable.getFieldName());
+            ParquetSchemaTree.SchemaNode childType = recordType.getChildren().get(childColumnName);
+            if (childType == null) {
+                schemaComparisonType =
+                        ISchemaChecker.max(schemaComparisonType, ISchemaChecker.SchemaComparisonType.GROWING);
+                continue;
+            }
+            schemaComparisonType = ISchemaChecker.max(schemaComparisonType, child.accept(this, childType));
+        }
+        return schemaComparisonType;
+    }
+
+    @Override
+    public ISchemaChecker.SchemaComparisonType visit(AbstractListLazyVisitablePointable pointable,
+            ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            return ISchemaChecker.SchemaComparisonType.GROWING;
+        }
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.ListType)) {
+            return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+        }
+
+        ParquetSchemaTree.ListType listType = (ParquetSchemaTree.ListType) schemaNode.getType();
+        ISchemaChecker.SchemaComparisonType schemaComparisonType = ISchemaChecker.SchemaComparisonType.EQUIVALENT;
+
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            AbstractLazyVisitablePointable child = pointable.getChildVisitablePointable();
+            if (listType.isEmpty()) {
+                schemaComparisonType =
+                        ISchemaChecker.max(schemaComparisonType, ISchemaChecker.SchemaComparisonType.GROWING);
+                continue;
+            }
+            schemaComparisonType = ISchemaChecker.max(schemaComparisonType, child.accept(this, listType.getChild()));
+        }
+        return schemaComparisonType;
+    }
+
+    @Override
+    public ISchemaChecker.SchemaComparisonType visit(FlatLazyVisitablePointable pointable,
+            ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException {
+        if (schemaNode.getType() == null) {
+            return ISchemaChecker.SchemaComparisonType.GROWING;
+        }
+        if (!(schemaNode.getType() instanceof ParquetSchemaTree.FlatType)) {
+            return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+        }
+
+        ParquetSchemaTree.FlatType flatType = (ParquetSchemaTree.FlatType) schemaNode.getType();
+
+        if (flatType.getPrimitiveTypeName() != AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP.get(pointable.getTypeTag())
+                || !(flatType.getLogicalTypeAnnotation() == AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP
+                        .get(pointable.getTypeTag()))) {
+            return ISchemaChecker.SchemaComparisonType.CONFLICTING;
+        }
+
+        return ISchemaChecker.SchemaComparisonType.EQUIVALENT;
+    }
+
+    @Override
+    public SchemaComparisonType checkSchema(ParquetSchemaTree.SchemaNode schemaNode, IValueReference valueReference)
+            throws HyracksDataException {
+        record.set(valueReference);
+        return record.accept(this, schemaNode);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
index b25d1f5..a6ea115 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.writer.printer.parquet;
 
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getGroupChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getListChild;
+import static org.apache.asterix.external.writer.printer.parquet.ParquetSchemaBuilderUtils.getPrimitiveChild;
+
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.om.types.ARecordType;
@@ -117,35 +121,4 @@
         return null;
     }
 
-    private static Types.BaseGroupBuilder getGroupChild(Types.Builder parent) {
-        if (parent instanceof Types.BaseGroupBuilder) {
-            return ((Types.BaseGroupBuilder<?, ?>) parent).optionalGroup();
-        } else if (parent instanceof Types.BaseListBuilder) {
-            return ((Types.BaseListBuilder<?, ?>) parent).optionalGroupElement();
-        } else {
-            return null;
-        }
-    }
-
-    private static Types.BaseListBuilder getListChild(Types.Builder parent) {
-        if (parent instanceof Types.BaseGroupBuilder) {
-            return ((Types.BaseGroupBuilder<?, ?>) parent).optionalList();
-        } else if (parent instanceof Types.BaseListBuilder) {
-            return ((Types.BaseListBuilder<?, ?>) parent).optionalListElement();
-        } else {
-            return null;
-        }
-    }
-
-    private static Types.Builder getPrimitiveChild(Types.Builder parent, PrimitiveType.PrimitiveTypeName type,
-            LogicalTypeAnnotation annotation) {
-        if (parent instanceof Types.BaseGroupBuilder) {
-            return ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation);
-        } else if (parent instanceof Types.BaseListBuilder) {
-            return ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation);
-        } else {
-            return null;
-        }
-    }
-
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index cc456e5..c152853 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -100,7 +100,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
@@ -110,9 +109,6 @@
 import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -143,7 +139,6 @@
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalWriterRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -794,17 +789,9 @@
             IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression staticPathExpr,
             SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
             throws AlgebricksException {
-        String staticPath = staticPathExpr != null ? ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
-        IExternalFileWriterFactory fileWriterFactory =
-                ExternalWriterProvider.createWriterFactory(appCtx, sink, staticPath, pathSourceLocation);
-        fileWriterFactory.validate();
-        String fileExtension = ExternalWriterProvider.getFileExtension(sink);
-        int maxResult = ExternalWriterProvider.getMaxResult(sink);
-        IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(appCtx, sink, sourceType);
-        ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
-                fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
-        SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
-                partitionComparatorFactories, inputDesc, writerFactory);
+        IPushRuntimeFactory runtime = ExternalWriterProvider.getWriteFileRuntime(appCtx, sink, sourceType,
+                staticPathExpr, pathSourceLocation, dynamicPathEvalFactory, inputDesc, sourceColumn, partitionColumns,
+                partitionComparatorFactories);
         return new Pair<>(runtime, null);
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 73bdbb8..f58df7e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.zip.Deflater;
 
+import org.apache.asterix.cloud.parquet.ParquetSinkExternalWriterFactory;
 import org.apache.asterix.cloud.writer.GCSExternalFileWriterFactory;
 import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -35,18 +36,30 @@
 import org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
 import org.apache.asterix.external.writer.printer.CsvExternalFilePrinterFactory;
 import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactory;
+import org.apache.asterix.external.writer.printer.ParquetExternalFilePrinterFactoryProvider;
 import org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
 import org.apache.asterix.formats.nontagged.CSVPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
 import org.apache.asterix.metadata.declared.IExternalWriteDataSink;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
 import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.asterix.runtime.writer.PathResolverFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.WriterPartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.util.StorageUtil;
@@ -66,7 +79,7 @@
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, GCSExternalFileWriterFactory.PROVIDER);
     }
 
-    public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
+    private static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
             String staticPath, SourceLocation pathExpressionLocation) {
         String adapterName = sink.getAdapterName().toLowerCase();
         IExternalFileWriterFactoryProvider creator = CREATOR_MAP.get(adapterName);
@@ -78,14 +91,14 @@
         return creator.create(createConfiguration(appCtx, sink, staticPath, pathExpressionLocation));
     }
 
-    public static String getFileExtension(IWriteDataSink sink) {
+    private static String getFileExtension(IWriteDataSink sink) {
         Map<String, String> configuration = sink.getConfiguration();
         String format = getFormat(configuration);
         String compression = getCompression(configuration);
         return format + (compression.isEmpty() ? "" : "." + compression);
     }
 
-    public static int getMaxResult(IWriteDataSink sink) {
+    private static int getMaxResult(IWriteDataSink sink) {
         String maxResultString = sink.getConfiguration().get(ExternalDataConstants.KEY_WRITER_MAX_RESULT);
         if (maxResultString == null) {
             return ExternalDataConstants.WRITER_MAX_RESULT_DEFAULT;
@@ -93,6 +106,14 @@
         return Integer.parseInt(maxResultString);
     }
 
+    private static int getMaxParquetSchema(Map<String, String> conf) {
+        String maxResultString = conf.get(ExternalDataConstants.PARQUET_MAX_SCHEMAS_KEY);
+        if (maxResultString == null) {
+            return ExternalDataConstants.PARQUET_MAX_SCHEMAS_DEFAULT_VALUE;
+        }
+        return Integer.parseInt(maxResultString);
+    }
+
     private static ExternalFileWriterConfiguration createConfiguration(ICcApplicationContext appCtx,
             IWriteDataSink sink, String staticPath, SourceLocation pathExpressionLocation) {
         Map<String, String> params = sink.getConfiguration();
@@ -117,28 +138,69 @@
         CREATOR_MAP.put(adapterName.toLowerCase(), creator);
     }
 
-    public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink,
-            Object sourceType) throws CompilationException {
+    public static IPushRuntimeFactory getWriteFileRuntime(ICcApplicationContext appCtx, IWriteDataSink sink,
+            Object sourceType, ILogicalExpression staticPathExpr, SourceLocation pathSourceLocation,
+            IScalarEvaluatorFactory dynamicPathEvalFactory, RecordDescriptor inputDesc, int sourceColumn,
+            int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories)
+            throws AlgebricksException {
+        String staticPath = staticPathExpr != null ? ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
+        IExternalFileWriterFactory fileWriterFactory =
+                ExternalWriterProvider.createWriterFactory(appCtx, sink, staticPath, pathSourceLocation);
+        fileWriterFactory.validate();
+        String fileExtension = ExternalWriterProvider.getFileExtension(sink);
+        int maxResult = ExternalWriterProvider.getMaxResult(sink);
+
         Map<String, String> configuration = sink.getConfiguration();
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
 
         // Check for supported formats
-        if (!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)
-                && !ExternalDataConstants.FORMAT_PARQUET.equalsIgnoreCase(format)
-                && !ExternalDataConstants.FORMAT_CSV_LOWER_CASE.equalsIgnoreCase(format)) {
+        if (!ExternalDataConstants.WRITER_SUPPORTED_FORMATS.contains(format.toLowerCase())) {
             throw new UnsupportedOperationException("Unsupported format " + format);
         }
 
         String compression = getCompression(configuration);
+        WriterPartitionerFactory partitionerFactory =
+                new WriterPartitionerFactory(partitionColumns, partitionComparatorFactories);
+        PathResolverFactory pathResolverFactory = new PathResolverFactory(fileWriterFactory, fileExtension,
+                dynamicPathEvalFactory, staticPath, pathSourceLocation);
         IPrinterFactory printerFactory;
         IExternalFileCompressStreamFactory compressStreamFactory;
+        IExternalPrinterFactory externalPrinterFactory;
+        ExternalFileWriterFactory writerFactory;
         switch (format) {
             case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
                 compressStreamFactory = createCompressionStreamFactory(appCtx, compression, configuration);
                 printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
-                return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
+                externalPrinterFactory = new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
+                writerFactory = new ExternalFileWriterFactory(fileWriterFactory, externalPrinterFactory,
+                        pathResolverFactory, maxResult);
+
+                return new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
+                        partitionComparatorFactories, inputDesc, writerFactory);
+            case ExternalDataConstants.FORMAT_CSV_LOWER_CASE:
+                compressStreamFactory = createCompressionStreamFactory(appCtx, compression, configuration);
+                if (sink instanceof IExternalWriteDataSink) {
+                    ARecordType itemType = ((IExternalWriteDataSink) sink).getItemType();
+                    if (itemType != null) {
+                        printerFactory =
+                                CSVPrinterFactoryProvider
+                                        .createInstance(itemType, sink.getConfiguration(),
+                                                ((IExternalWriteDataSink) sink).getSourceLoc())
+                                        .getPrinterFactory(sourceType);
+                        externalPrinterFactory =
+                                new CsvExternalFilePrinterFactory(printerFactory, compressStreamFactory);
+                        writerFactory = new ExternalFileWriterFactory(fileWriterFactory, externalPrinterFactory,
+                                pathResolverFactory, maxResult);
+                        return new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
+                                partitionComparatorFactories, inputDesc, writerFactory);
+                    } else {
+                        throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
+                    }
+                } else {
+                    throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
+                }
+
             case ExternalDataConstants.FORMAT_PARQUET:
-                String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
 
                 CompressionCodecName compressionCodecName;
                 if (compression == null || compression.equals("") || compression.equals("none")) {
@@ -152,31 +214,32 @@
 
                 long rowGroupSize = StorageUtil.getByteValue(rowGroupSizeString);
                 int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
-
                 ParquetProperties.WriterVersion writerVersion = getParquetWriterVersion(configuration);
 
-                return new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
-                        (IAType) sourceType, rowGroupSize, pageSize, writerVersion);
-            case ExternalDataConstants.FORMAT_CSV_LOWER_CASE:
-                compressStreamFactory = createCompressionStreamFactory(appCtx, compression, configuration);
-                if (sink instanceof IExternalWriteDataSink) {
-                    ARecordType itemType = ((IExternalWriteDataSink) sink).getItemType();
-                    if (itemType != null) {
-                        printerFactory =
-                                CSVPrinterFactoryProvider
-                                        .createInstance(itemType, sink.getConfiguration(),
-                                                ((IExternalWriteDataSink) sink).getSourceLoc())
-                                        .getPrinterFactory(sourceType);
-                        return new CsvExternalFilePrinterFactory(printerFactory, compressStreamFactory);
-                    } else {
-                        throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
-                    }
-                } else {
-                    throw new CompilationException(ErrorCode.INVALID_CSV_SCHEMA);
+                if (configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) {
+                    String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
+                    ParquetExternalFilePrinterFactory parquetPrinterFactory =
+                            new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+                                    (IAType) sourceType, rowGroupSize, pageSize, writerVersion);
+
+                    ExternalFileWriterFactory parquetWriterFactory = new ExternalFileWriterFactory(fileWriterFactory,
+                            parquetPrinterFactory, pathResolverFactory, maxResult);
+                    return new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
+                            partitionComparatorFactories, inputDesc, parquetWriterFactory);
                 }
+
+                int maxSchemas = ExternalWriterProvider.getMaxParquetSchema(configuration);
+                ParquetExternalFilePrinterFactoryProvider printerFactoryProvider =
+                        new ParquetExternalFilePrinterFactoryProvider(compressionCodecName, (IAType) sourceType,
+                                rowGroupSize, pageSize, writerVersion);
+                return new ParquetSinkExternalWriterFactory(partitionerFactory, inputDesc, sourceColumn,
+                        (IAType) sourceType, maxSchemas, fileWriterFactory, maxResult, printerFactoryProvider,
+                        pathResolverFactory);
+
             default:
                 throw new UnsupportedOperationException("Unsupported format " + format);
         }
+
     }
 
     private static ParquetProperties.WriterVersion getParquetWriterVersion(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index d7a71bf..b466d0d 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -154,5 +154,9 @@
       <groupId>org.locationtech.jts</groupId>
       <artifactId>jts-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-column</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index c6d1dbe..e3ccc33 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -32,7 +32,7 @@
 import org.apache.hyracks.util.string.UTF8CharBuffer;
 import org.apache.hyracks.util.string.UTF8StringUtil;
 
-final class DynamicPathResolver extends AbstractPathResolver {
+public final class DynamicPathResolver extends AbstractPathResolver {
     private final IScalarEvaluator pathEval;
     private final IWarningCollector warningCollector;
     private final StringBuilder dirStringBuilder;
@@ -40,7 +40,7 @@
     private final UTF8CharBuffer charBuffer;
     private final SourceLocation pathSourceLocation;
 
-    DynamicPathResolver(String fileExtension, char fileSeparator, int partition, IScalarEvaluator pathEval,
+    public DynamicPathResolver(String fileExtension, char fileSeparator, int partition, IScalarEvaluator pathEval,
             IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
         super(fileExtension, fileSeparator, partition);
         this.pathEval = pathEval;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index 95dc962..e8bae00 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -23,7 +23,7 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-final class ExternalFileWriter implements IExternalWriter {
+public final class ExternalFileWriter implements IExternalWriter {
     static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
     private final IPathResolver pathResolver;
     private final IExternalFileWriter writer;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
index 5981584..f85a8ab 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -18,52 +18,29 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ExternalFileWriterFactory implements IExternalWriterFactory {
     private static final long serialVersionUID = 1412969574113419638L;
     private final IExternalFileWriterFactory writerFactory;
     private final IExternalPrinterFactory printerFactory;
-    private final String fileExtension;
     private final int maxResult;
-    private final IScalarEvaluatorFactory pathEvalFactory;
-    private final String staticPath;
-    private final SourceLocation pathSourceLocation;
+    private final IPathResolverFactory pathResolverFactory;
 
     public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory, IExternalPrinterFactory printerFactory,
-            String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
-            SourceLocation pathSourceLocation) {
+            IPathResolverFactory pathResolverFactory, int maxResult) {
         this.writerFactory = writerFactory;
         this.printerFactory = printerFactory;
-        this.fileExtension = fileExtension;
+        this.pathResolverFactory = pathResolverFactory;
         this.maxResult = maxResult;
-        this.pathEvalFactory = pathEvalFactory;
-        this.staticPath = staticPath;
-        this.pathSourceLocation = pathSourceLocation;
     }
 
     @Override
     public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
-        int partition = context.getTaskAttemptId().getTaskId().getPartition();
-        char fileSeparator = writerFactory.getSeparator();
-        IPathResolver resolver;
-        if (staticPath == null) {
-            EvaluatorContext evaluatorContext = new EvaluatorContext(context);
-            IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
-            IWarningCollector warningCollector = context.getWarningCollector();
-            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
-                    pathSourceLocation);
-        } else {
-            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
-        }
+        IPathResolver resolver = pathResolverFactory.createResolver(context);
         IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
         return new ExternalFileWriter(resolver, writer, maxResult);
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
index 35b4ddd..c4a3b04 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -24,7 +24,7 @@
 /**
  * Path resolver which generates paths for the written files
  */
-interface IPathResolver {
+public interface IPathResolver {
 
     /**
      * Extract the partitioning values from the provided tuple and generates the file path
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolverFactory.java
similarity index 69%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolverFactory.java
index b6ec758..18a8799 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolverFactory.java
@@ -16,22 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.asterix.runtime.writer;
 
-USE test;
+import java.io.Serializable;
 
-COPY (
-   select c.* from TestCollection c
-) toWriter
-TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
-WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
-    "format":"parquet"
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPathResolverFactory extends Serializable {
+
+    IPathResolver createResolver(IHyracksTaskContext ctx) throws HyracksDataException;
+
 }
-
-
-
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/PathResolverFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/PathResolverFactory.java
new file mode 100644
index 0000000..8707cae
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/PathResolverFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class PathResolverFactory implements IPathResolverFactory {
+    private static final long serialVersionUID = 8971234908711238L;
+    private final IExternalFileWriterFactory writerFactory;
+    private final String fileExtension;
+    private final IScalarEvaluatorFactory pathEvalFactory;
+    private final String staticPath;
+    private final SourceLocation pathSourceLocation;
+
+    public PathResolverFactory(IExternalFileWriterFactory writerFactory, String fileExtension,
+            IScalarEvaluatorFactory pathEvalFactory, String staticPath, SourceLocation pathSourceLocation) {
+        this.writerFactory = writerFactory;
+        this.fileExtension = fileExtension;
+        this.pathEvalFactory = pathEvalFactory;
+        this.pathSourceLocation = pathSourceLocation;
+        this.staticPath = staticPath;
+    }
+
+    @Override
+    public IPathResolver createResolver(IHyracksTaskContext context) throws HyracksDataException {
+        int partition = context.getTaskAttemptId().getTaskId().getPartition();
+        char fileSeparator = writerFactory.getSeparator();
+        IPathResolver resolver;
+        if (staticPath == null) {
+            EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+            IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
+            IWarningCollector warningCollector = context.getWarningCollector();
+            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
+                    pathSourceLocation);
+        } else {
+            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
+        }
+        return resolver;
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
index 52943ed..c5fa0dd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -20,10 +20,10 @@
 
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-final class StaticPathResolver extends AbstractPathResolver {
+public final class StaticPathResolver extends AbstractPathResolver {
     private final String directoryPath;
 
-    StaticPathResolver(String fileExtension, char fileSeparator, int partition, String directoryPath) {
+    public StaticPathResolver(String fileExtension, char fileSeparator, int partition, String directoryPath) {
         super(fileExtension, fileSeparator, partition);
 
         if (!directoryPath.isEmpty() && directoryPath.charAt(directoryPath.length() - 1) != fileSeparator) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
index cb72aec..7cf4d05 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
@@ -21,7 +21,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
-interface IWriterPartitioner {
+public interface IWriterPartitioner {
     boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws HyracksDataException;
 
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
index 321828f..3612511 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -23,7 +23,6 @@
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,64 +30,35 @@
 public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
     private static final long serialVersionUID = -2215789207336628581L;
     private final int sourceColumn;
-    private final int[] partitionColumns;
-    private final IBinaryComparatorFactory[] partitionComparatorFactories;
-    private final boolean partitionByKey;
     private final RecordDescriptor inputRecordDescriptor;
     private final IExternalWriterFactory writerFactory;
+    private final WriterPartitionerFactory partitionerFactory;
 
     public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumns,
             IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
             IExternalWriterFactory writerFactory) {
-        this(sourceColumn, partitionColumns, partitionComparatorFactories, false, inputRecordDescriptor, writerFactory);
+        this(sourceColumn, inputRecordDescriptor, writerFactory,
+                new WriterPartitionerFactory(partitionColumns, partitionComparatorFactories));
     }
 
     public SinkExternalWriterRuntimeFactory(int sourceColumn, RecordDescriptor inputRecordDescriptor,
             IExternalWriterFactory writerFactory) {
-        this(sourceColumn, null, null, true, inputRecordDescriptor, writerFactory);
+        this(sourceColumn, inputRecordDescriptor, writerFactory, new WriterPartitionerFactory());
     }
 
-    private SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumns,
-            IBinaryComparatorFactory[] partitionComparatorFactories, boolean partitionByKey,
-            RecordDescriptor inputRecordDescriptor, IExternalWriterFactory writerFactory) {
+    private SinkExternalWriterRuntimeFactory(int sourceColumn, RecordDescriptor inputRecordDescriptor,
+            IExternalWriterFactory writerFactory, WriterPartitionerFactory partitionerFactory) {
         this.sourceColumn = sourceColumn;
-        this.partitionColumns = partitionColumns;
-        this.partitionComparatorFactories = partitionComparatorFactories;
-        this.partitionByKey = partitionByKey;
         this.inputRecordDescriptor = inputRecordDescriptor;
         this.writerFactory = writerFactory;
+        this.partitionerFactory = partitionerFactory;
     }
 
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
         IExternalWriter writer = writerFactory.createWriter(ctx);
-        SinkExternalWriterRuntime runtime =
-                new SinkExternalWriterRuntime(sourceColumn, createPartitioner(), inputRecordDescriptor, writer);
+        SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn,
+                partitionerFactory.createPartitioner(), inputRecordDescriptor, writer);
         return new IPushRuntime[] { runtime };
     }
-
-    /**
-     * Creates the writer partitioner based on the provided parameters
-     *
-     * @return writer partitioner
-     */
-    private IWriterPartitioner createPartitioner() {
-        // key writer partitioner
-        if (partitionByKey) {
-            return KeyWriterPartitioner.INSTANCE;
-        }
-
-        // writer partitioner
-        if (partitionColumns.length > 0) {
-            IBinaryComparator[] comparators = new IBinaryComparator[partitionComparatorFactories.length];
-            for (int i = 0; i < partitionComparatorFactories.length; i++) {
-                comparators[i] = partitionComparatorFactories[i].createBinaryComparator();
-            }
-
-            return new WriterPartitioner(partitionColumns, comparators);
-        }
-
-        // no-op partitioner
-        return new NoOpWriterPartitioner();
-    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitionerFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitionerFactory.java
new file mode 100644
index 0000000..1d5b089
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitionerFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class WriterPartitionerFactory implements Serializable {
+    private static final long serialVersionUID = 8971234908711239L;
+    private final boolean partitionByKey;
+    private final int[] partitionColumns;
+    private final IBinaryComparatorFactory[] partitionComparatorFactories;
+
+    public WriterPartitionerFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories) {
+        this(partitionColumns, partitionComparatorFactories, false);
+    }
+
+    public WriterPartitionerFactory() {
+        this(null, null, true);
+    }
+
+    private WriterPartitionerFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+            boolean partitionByKey) {
+        this.partitionColumns = partitionColumns;
+        this.partitionComparatorFactories = partitionComparatorFactories;
+        this.partitionByKey = partitionByKey;
+    }
+
+    /**
+     * Creates the writer partitioner based on the provided parameters
+     *
+     * @return writer partitioner
+     */
+    public IWriterPartitioner createPartitioner() {
+        // key writer partitioner
+        if (partitionByKey) {
+            return KeyWriterPartitioner.INSTANCE;
+        }
+
+        // writer partitioner
+        if (partitionColumns.length > 0) {
+            IBinaryComparator[] comparators = new IBinaryComparator[partitionComparatorFactories.length];
+            for (int i = 0; i < partitionComparatorFactories.length; i++) {
+                comparators[i] = partitionComparatorFactories[i].createBinaryComparator();
+            }
+
+            return new WriterPartitioner(partitionColumns, comparators);
+        }
+
+        // no-op partitioner
+        return new NoOpWriterPartitioner();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 02327e2..7cb107d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -160,6 +160,8 @@
     FRAME_BIGGER_THAN_SORT_MEMORY(130),
     RESULT_DOES_NOT_FOLLOW_SCHEMA(131),
     EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132),
+    EMPTY_TYPE_INFERRED(133),
+    SCHEMA_LIMIT_EXCEEDED(134),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 226234f..e1fbe30 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -150,6 +150,8 @@
 130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget.
 131 = Result does not follow the schema, %1$s type expected but found %2$s type at '%3$s'
 132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the schema
+133 = Schema could not be inferred, empty types found in the result
+134 = Schema Limit exceeded, maximum number of heterogeneous schemas allowed : '%1$s'
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
