[ASTERIXDB-3519][EXT]: Support writing query result to HDFS
- user model changes: Added authentication for HDFS and write support
- storage format changes: no
- interface changes: no
details:
- Support Kerberos Authentication for reading and writing to HDFS.
- Add write support for HDFS.
Ext-ref: MB-63117
Change-Id: Ifc4ed9fb17a7540cf444a2f8bef590756b909988
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18767
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 1a5cb66..0a96943 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -82,4 +82,12 @@
+ BLOB_ENDPOINT_PLACEHOLDER + "\")";
public static final String TEMPLATE_DEFAULT = TEMPLATE;
}
+
+ public static class HDFS {
+ public static final String HDFS_AUTHENTICATION_DEFAULT = "kerberos";
+ public static final String KERBEROS_PRINCIPAL_DEFAULT = "hdfsuser@EXAMPLE.COM";
+ public static final String KERBEROS_PASSWORD_DEFAULT = "hdfspassword";
+ public static final String KERBEROS_REALM_DEFAULT = "EXAMPLE.COM";
+ public static final String KERBEROS_KDC_DEFAULT = "localhost:8800";
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 37527bd..3de0cd7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -2424,6 +2424,8 @@
str = applyAzureSubstitution(str, placeholders);
} else if (placeholder.getValue().equalsIgnoreCase("GCS")) {
str = applyGCSSubstitution(str, placeholders);
+ } else if (placeholder.getValue().equalsIgnoreCase("HDFS")) {
+ str = applyHDFSSubstitution(str, placeholders);
}
} else {
// Any other place holders, just replace with the value
@@ -2534,6 +2536,11 @@
return str;
}
+ protected String applyHDFSSubstitution(String str, List<Placeholder> placeholders) {
+ str = setHDFSTemplateDefault(str);
+ return str;
+ }
+
protected String setAzureTemplate(String str) {
return str.replace("%template%", TEMPLATE);
}
@@ -2546,6 +2553,10 @@
return str;
}
+ protected String setHDFSTemplateDefault(String str) {
+ return str;
+ }
+
protected void fail(boolean runDiagnostics, TestCaseContext testCaseCtx, CompilationUnit cUnit,
List<TestFileContext> testFileCtxs, ProcessBuilder pb, File testFile, Exception e) throws Exception {
if (runDiagnostics) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
new file mode 100644
index 0000000..56e79c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+
+CREATE TYPE ColumnType1 AS {
+ id: integer,
+ name : string
+};
+
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
new file mode 100644
index 0000000..d2a376c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TestCollection({"id":18, "name": "Virat" , "dateType":date("1988-11-05"), "timeType": time("03:10:00.493Z") , "boolType" : false , "doubleType" : 0.75, "datetimeType" : datetime("1900-02-01T00:00:00") });
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
new file mode 100644
index 0000000..a95146f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO hdfs
+PATH ("copy-to-result", "parquet-cover-data-types")
+TYPE ( { name : string, id : int, dateType : date, timeType : time, boolType : boolean, doubleType : double, datetimeType : datetime } )
+WITH {
+ "hdfs":"hdfs://127.0.0.1:31888/",
+ "format":"parquet"
+};
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.04.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.04.ddl.sqlpp
new file mode 100644
index 0000000..f23893b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.04.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+CREATE TYPE ColumnType2 AS {
+};
+
+
+
+CREATE EXTERNAL DATASET TestDataset(ColumnType2) USING hdfs
+(
+ ("hdfs"="hdfs://127.0.0.1:31888/"),
+ ("path"="copy-to-result/parquet-cover-data-types/"),
+ ("include"="*.parquet"),
+ ("input-format"="parquet-input-format"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.05.query.sqlpp
new file mode 100644
index 0000000..b03fc5e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.05.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT c.*
+FROM TestDataset c
+ORDER BY c.id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.01.ddl.sqlpp
new file mode 100644
index 0000000..6be9489
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.01.ddl.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+
+CREATE TYPE ColumnType1 AS {
+ id: integer
+};
+
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.02.update.sqlpp
new file mode 100644
index 0000000..d569ee5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.02.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+INSERT INTO TestCollection {"id":2,"name":{"first":["power","star"]}};
+INSERT INTO TestCollection {"id":5,"name":{"first":[]}};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.03.update.sqlpp
new file mode 100644
index 0000000..5a66928
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.03.update.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+select c.* from TestCollection c
+ ) toWriter
+TO hdfs
+PATH ("copy-to-result", "parquet-empty-array")
+TYPE ( { id : int, name : { first : [ string ] } } )
+WITH {
+ "hdfs":"hdfs://127.0.0.1:31888",
+ "format":"parquet"
+ };
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.04.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.04.ddl.sqlpp
new file mode 100644
index 0000000..a7ee67f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.04.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+CREATE TYPE ColumnType2 AS {
+ };
+
+
+
+CREATE EXTERNAL DATASET TestDataset(ColumnType2) USING hdfs
+(
+ ("hdfs"="hdfs://127.0.0.1:31888/"),
+ ("path"="copy-to-result/parquet-empty-array/"),
+ ("include"="*.parquet"),
+ ("input-format"="parquet-input-format"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.05.query.sqlpp
new file mode 100644
index 0000000..b03fc5e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-empty-array/parquet-empty-array.05.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT c.*
+FROM TestDataset c
+ORDER BY c.id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.01.ddl.sqlpp
new file mode 100644
index 0000000..76970a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.01.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE ColumnType2 AS {
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.02.update.sqlpp
new file mode 100644
index 0000000..1183752
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.02.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+COPY (
+ select "123" as id
+) toWriter
+TO hdfs
+PATH ("copy-to-result", "parquet-simple")
+TYPE ( {id:string} )
+WITH {
+ "hdfs":"hdfs://127.0.0.1:31888",
+ "format":"parquet",
+ "version" : "2"
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.03.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.03.ddl.sqlpp
new file mode 100644
index 0000000..e31c9b0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.03.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+CREATE EXTERNAL DATASET DatasetCopy(ColumnType2) USING hdfs
+(
+ ("hdfs"="hdfs://127.0.0.1:31888/"),
+ ("path"="copy-to-result/parquet-simple"),
+ ("format" = "parquet"),
+ ("input-format"="parquet-input-format"),
+ ("requireVersionChangeDetection"="false"),
+ ("include"="*.parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.04.query.sqlpp
new file mode 100644
index 0000000..5aeedb8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-simple/parquet-simple.04.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT id
+FROM DatasetCopy c
+ORDER BY c.id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.01.ddl.sqlpp
new file mode 100644
index 0000000..f890e0d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.01.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE ColumnType1 AS {
+ id: string
+};
+
+CREATE DATASET DummyTweetDataset(ColumnType1)
+PRIMARY KEY id WITH {
+ "storage-format": {"format" : "column"}
+};
+
+
+CREATE TYPE ColumnType2 AS {
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.02.update.sqlpp
new file mode 100644
index 0000000..83a1140
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.02.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+LOAD DATASET DummyTweetDataset USING localfs
+(
+ ("path" = "asterix_nc1://data/hdfs/parquet/dummy_tweet.json"),
+ ("format" = "json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.03.update.sqlpp
new file mode 100644
index 0000000..9bae74b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.03.update.sqlpp
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ SELECT c.* FROM DummyTweetDataset c
+) toWriter
+TO hdfs
+PATH ("copy-to-result", "parquet-tweet")
+TYPE ( {
+ coordinates: {
+ coordinates: [
+ double
+ ],
+ `type` : string
+ },
+ created_at: string,
+ entities: {
+ urls: [
+ {
+ display_url: string,
+ expanded_url: string,
+ indices: [
+ int
+ ],
+ url: string
+ }
+ ],
+ user_mentions: [
+ {
+ id: int,
+ id_str: string,
+ indices: [
+ int
+ ],
+ name: string,
+ screen_name: string
+ }
+ ]
+ },
+ favorite_count: int,
+ favorited: boolean,
+ filter_level: string,
+ geo: {
+ coordinates: [
+ double
+ ],
+ `type`: string
+ },
+ id: string,
+ id_str: string,
+ in_reply_to_screen_name: string,
+ in_reply_to_status_id: int,
+ in_reply_to_status_id_str: string,
+ in_reply_to_user_id: int,
+ in_reply_to_user_id_str: string,
+ is_quote_status: boolean,
+ lang: string,
+ place: {
+ bounding_box: {
+ coordinates: [
+ [
+ [
+ double
+ ]
+ ]
+ ],
+ `type`: string
+ },
+ country: string,
+ country_code: string,
+ full_name: string,
+ id: string,
+ name: string,
+ place_type: string,
+ url: string
+ },
+ possibly_sensitive: boolean,
+ quoted_status: {
+ created_at: string,
+ entities: {
+ user_mentions: [
+ {
+ id: int,
+ id_str: string,
+ indices: [
+ int
+ ],
+ name: string,
+ screen_name: string
+ }
+ ]
+ },
+ favorite_count: int,
+ favorited: boolean,
+ filter_level: string,
+ id: int,
+ id_str: string,
+ in_reply_to_screen_name: string,
+ in_reply_to_status_id: int,
+ in_reply_to_status_id_str: string,
+ in_reply_to_user_id: int,
+ in_reply_to_user_id_str: string,
+ is_quote_status: boolean,
+ lang: string,
+ retweet_count: int,
+ retweeted: boolean,
+ source: string,
+ text: string,
+ truncated: boolean,
+ user: {
+ contributors_enabled: boolean,
+ created_at: string,
+ default_profile: boolean,
+ default_profile_image: boolean,
+ description: string,
+ favourites_count: int,
+ followers_count: int,
+ friends_count: int,
+ geo_enabled: boolean,
+ id: int,
+ id_str: string,
+ is_translator: boolean,
+ lang: string,
+ listed_count: int,
+ name: string,
+ profile_background_color: string,
+ profile_background_image_url: string,
+ profile_background_image_url_https: string,
+ profile_background_tile: boolean,
+ profile_banner_url: string,
+ profile_image_url: string,
+ profile_image_url_https: string,
+ profile_link_color: string,
+ profile_sidebar_border_color: string,
+ profile_sidebar_fill_color: string,
+ profile_text_color: string,
+ profile_use_background_image: boolean,
+ protected: boolean,
+ screen_name: string,
+ statuses_count: int,
+ verified: boolean
+ }
+ },
+ quoted_status_id: int,
+ quoted_status_id_str: string,
+ retweet_count: int,
+ retweeted: boolean,
+ source: string,
+ text: string,
+ timestamp_ms: string,
+ truncated: boolean,
+ user: {
+ contributors_enabled: boolean,
+ created_at: string,
+ default_profile: boolean,
+ default_profile_image: boolean,
+ description: string,
+ favourites_count: int,
+ followers_count: int,
+ friends_count: int,
+ geo_enabled: boolean,
+ id: int,
+ id_str: string,
+ is_translator: boolean,
+ lang: string,
+ listed_count: int,
+ location: string,
+ name: string,
+ profile_background_color: string,
+ profile_background_image_url: string,
+ profile_background_image_url_https: string,
+ profile_background_tile: boolean,
+ profile_banner_url: string,
+ profile_image_url: string,
+ profile_image_url_https: string,
+ profile_link_color: string,
+ profile_sidebar_border_color: string,
+ profile_sidebar_fill_color: string,
+ profile_text_color: string,
+ profile_use_background_image: boolean,
+ protected: boolean,
+ screen_name: string,
+ statuses_count: int,
+ time_zone: string,
+ url: string,
+ utc_offset: int,
+ verified: boolean
+ }
+ } )
+WITH {
+ "hdfs":"hdfs://127.0.0.1:31888",
+ "format":"parquet"
+};
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.04.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.04.ddl.sqlpp
new file mode 100644
index 0000000..62467d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.04.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+
+CREATE EXTERNAL DATASET DummyTweetDatasetCopy(ColumnType2) USING hdfs
+(
+ ("hdfs"="hdfs://127.0.0.1:31888/"),
+ ("path"="copy-to-result/parquet-tweet/"),
+ ("include"="*.parquet"),
+ ("input-format"="parquet-input-format"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.05.query.sqlpp
new file mode 100644
index 0000000..13587f6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-tweet/parquet-tweet.05.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT c.*
+FROM DummyTweetDatasetCopy c
+ORDER BY c.id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.01.ddl.sqlpp
new file mode 100644
index 0000000..dfc64ce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.01.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE ColumnType1 AS {
+ id: int
+};
+
+CREATE DATASET NameCommentDataset(ColumnType1)
+PRIMARY KEY id WITH {
+ "storage-format": {"format" : "column"}
+};
+
+
+CREATE TYPE ColumnType2 AS {
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.02.update.sqlpp
new file mode 100644
index 0000000..8591369
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.02.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+LOAD DATASET NameCommentDataset USING localfs
+(
+ ("path" = "asterix_nc1://data/hdfs/parquet/id_name_comment.json"),
+ ("format" = "json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.03.update.sqlpp
new file mode 100644
index 0000000..79b2dda
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.03.update.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+COPY (
+ SELECT c.* FROM NameCommentDataset c
+) toWriter
+TO hdfs
+PATH ("copy-to-result", "parquet-utf8")
+TYPE ( { comment:string, id:bigint, name:string } )
+WITH {
+ "hdfs":"hdfs://127.0.0.1:31888",
+ "format":"parquet"
+};
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.04.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.04.ddl.sqlpp
new file mode 100644
index 0000000..49c9b0f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.04.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+
+CREATE EXTERNAL DATASET NameCommentDatasetCopy(ColumnType2) USING hdfs
+(
+ ("hdfs"="hdfs://127.0.0.1:31888"),
+ ("path"="copy-to-result/parquet-utf8/"),
+ ("include"="*.parquet"),
+ ("input-format"="parquet-input-format"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.05.query.sqlpp
new file mode 100644
index 0000000..17cd027
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to-hdfs/parquet-utf8/parquet-utf8.05.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT c.*
+FROM NameCommentDatasetCopy c
+ORDER BY c.id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.05.adm
new file mode 100644
index 0000000..8fc863e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-cover-data-types/parquet-cover-data-types.05.adm
@@ -0,0 +1 @@
+{ "name": "Virat", "id": 18, "dateType": date("1988-11-05"), "timeType": time("03:10:00.493"), "boolType": false, "doubleType": 0.75, "datetimeType": datetime("1900-02-01T00:00:00.000") }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-empty-array/parquet-empty-array.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-empty-array/parquet-empty-array.05.adm
new file mode 100644
index 0000000..97de7e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-empty-array/parquet-empty-array.05.adm
@@ -0,0 +1,2 @@
+{ "id": 2, "name": { "first": [ "power", "star" ] } }
+{ "id": 5, "name": { "first": [ ] } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-simple/parquet-simple.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-simple/parquet-simple.04.adm
new file mode 100644
index 0000000..bf567b2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-simple/parquet-simple.04.adm
@@ -0,0 +1 @@
+{ "id": "123" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-tweet/parquet-tweet.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-tweet/parquet-tweet.05.adm
new file mode 100644
index 0000000..5e0df96
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-tweet/parquet-tweet.05.adm
@@ -0,0 +1,2 @@
+{ "coordinates": { "coordinates": [ 1.1 ], "type": "string" }, "created_at": "string", "entities": { "urls": [ { "display_url": "string", "expanded_url": "string", "indices": [ 1 ], "url": "string" } ], "user_mentions": [ { "id": 1, "id_str": "string", "indices": [ 1 ], "name": "string", "screen_name": "string" } ] }, "favorite_count": 1, "favorited": true, "filter_level": "string", "geo": { "coordinates": [ 1.1 ], "type": "string" }, "id": "0000000", "id_str": "string", "in_reply_to_screen_name": "string", "in_reply_to_status_id": 1, "in_reply_to_status_id_str": "string", "in_reply_to_user_id": 1, "in_reply_to_user_id_str": "string", "is_quote_status": true, "lang": "string", "place": { "bounding_box": { "coordinates": [ [ [ 1.1 ] ] ], "type": "string" }, "country": "string", "country_code": "string", "full_name": "string", "id": "string", "name": "string", "place_type": "string", "url": "string" }, "possibly_sensitive": true, "quoted_status": { "created_at": "string", "entities": { "user_mentions": [ { "id": 1, "id_str": "string", "indices": [ 1 ], "name": "string", "screen_name": "string" } ] }, "favorite_count": 1, "favorited": true, "filter_level": "string", "id": 1, "id_str": "string", "in_reply_to_screen_name": "string", "in_reply_to_status_id": 1, "in_reply_to_status_id_str": "string", "in_reply_to_user_id": 1, "in_reply_to_user_id_str": "string", "is_quote_status": true, "lang": "string", "retweet_count": 1, "retweeted": true, "source": "string", "text": "string", "truncated": true, "user": { "contributors_enabled": true, "created_at": "string", "default_profile": true, "default_profile_image": true, "description": "string", "favourites_count": 1, "followers_count": 1, "friends_count": 1, "geo_enabled": true, "id": 1, "id_str": "string", "is_translator": true, "lang": "string", "listed_count": 1, "name": "string", "profile_background_color": "string", "profile_background_image_url": "string", "profile_background_image_url_https": "string", "profile_background_tile": true, "profile_banner_url": "string", "profile_image_url": "string", "profile_image_url_https": "string", "profile_link_color": "string", "profile_sidebar_border_color": "string", "profile_sidebar_fill_color": "string", "profile_text_color": "string", "profile_use_background_image": true, "protected": true, "screen_name": "string", "statuses_count": 1, "verified": true } }, "quoted_status_id": 1, "quoted_status_id_str": "string", "retweet_count": 1, "retweeted": true, "source": "string", "text": "string", "timestamp_ms": "string", "truncated": true, "user": { "contributors_enabled": true, "created_at": "string", "default_profile": true, "default_profile_image": true, "description": "string", "favourites_count": 1, "followers_count": 1, "friends_count": 1, "geo_enabled": true, "id": 1, "id_str": "string", "is_translator": true, "lang": "string", "listed_count": 1, "location": "string", "name": "string", "profile_background_color": "string", "profile_background_image_url": "string", "profile_background_image_url_https": "string", "profile_background_tile": true, "profile_banner_url": "string", "profile_image_url": "string", "profile_image_url_https": "string", "profile_link_color": "string", "profile_sidebar_border_color": "string", "profile_sidebar_fill_color": "string", "profile_text_color": "string", "profile_use_background_image": true, "protected": true, "screen_name": "string", "statuses_count": 1, "time_zone": "string", "url": "string", "utc_offset": 1, "verified": true } }
+{ "coordinates": { "coordinates": [ 1.1 ], "type": "string" }, "created_at": "string", "favorite_count": 1, "favorited": true, "filter_level": "string", "geo": { "coordinates": [ 1.1 ], "type": "string" }, "id": "11111111111111111111", "id_str": "string", "in_reply_to_screen_name": "string", "in_reply_to_status_id": 1, "in_reply_to_status_id_str": "string", "in_reply_to_user_id": 1, "in_reply_to_user_id_str": "string", "is_quote_status": true, "lang": "string", "place": { "bounding_box": { "coordinates": [ [ [ 1.1 ] ] ], "type": "string" }, "country": "string", "country_code": "string", "full_name": "string", "id": "string", "name": "string", "place_type": "string", "url": "string" }, "possibly_sensitive": true, "quoted_status": { "created_at": "string", "entities": { "user_mentions": [ { "id": 1, "id_str": "string", "indices": [ 1 ], "name": "string", "screen_name": "string" } ] }, "favorite_count": 1, "favorited": true, "filter_level": "string", "id": 1, "id_str": "string", "in_reply_to_screen_name": "string", "in_reply_to_status_id": 1, "in_reply_to_status_id_str": "string", "in_reply_to_user_id": 1, "in_reply_to_user_id_str": "string", "is_quote_status": true, "lang": "string", "retweet_count": 1, "retweeted": true, "source": "string", "text": "string", "truncated": true, "user": { "contributors_enabled": true, "created_at": "string", "default_profile": true, "default_profile_image": true, "description": "string", "favourites_count": 1, "followers_count": 1, "friends_count": 1, "geo_enabled": true, "id": 1, "id_str": "string", "is_translator": true, "lang": "string", "listed_count": 1, "name": "string", "profile_background_color": "string", "profile_background_image_url": "string", "profile_background_image_url_https": "string", "profile_background_tile": true, "profile_banner_url": "string", "profile_image_url": "string", "profile_image_url_https": "string", "profile_link_color": "string", "profile_sidebar_border_color": "string", "profile_sidebar_fill_color": "string", "profile_text_color": "string", "profile_use_background_image": true, "protected": true, "screen_name": "string", "statuses_count": 1, "verified": true } }, "quoted_status_id": 1, "quoted_status_id_str": "string", "retweet_count": 1, "retweeted": true, "source": "string", "text": "string", "timestamp_ms": "string", "truncated": true, "user": { "contributors_enabled": true, "created_at": "string", "default_profile": true, "default_profile_image": true, "description": "string", "favourites_count": 1, "followers_count": 1, "friends_count": 1, "geo_enabled": true, "id": 1, "id_str": "string", "is_translator": true, "lang": "string", "listed_count": 1, "location": "string", "name": "string", "profile_background_color": "string", "profile_background_image_url": "string", "profile_background_image_url_https": "string", "profile_background_tile": true, "profile_banner_url": "string", "profile_image_url": "string", "profile_image_url_https": "string", "profile_link_color": "string", "profile_sidebar_border_color": "string", "profile_sidebar_fill_color": "string", "profile_text_color": "string", "profile_use_background_image": true, "protected": true, "screen_name": "string", "statuses_count": 1, "time_zone": "string", "url": "string", "utc_offset": 1, "verified": true } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-utf8/parquet-utf8.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-utf8/parquet-utf8.05.adm
new file mode 100644
index 0000000..c60145d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to-hdfs/parquet-utf8/parquet-utf8.05.adm
@@ -0,0 +1,8 @@
+{ "id": 1, "name": "John" }
+{ "id": 2, "name": "Abel" }
+{ "id": 3, "name": "Sandy" }
+{ "id": 4, "name": "Alex" }
+{ "id": 5, "name": "Mike" }
+{ "id": 6, "name": "Tom" }
+{ "comment": "😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا", "id": 7, "name": "Jerry" }
+{ "comment": "😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا😢😢💉💉 = 𩸽 😢😢💉💉. Coffee ☕‼️😃. حسنا", "id": 8, "name": "William" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ecbd4b9..ee9dc3b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -114,7 +114,7 @@
<test-case FilePath="copy-to/negative">
<compilation-unit name="supported-adapter-format-compression">
<output-dir compare="Text">supported-adapter-format-compression</output-dir>
- <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. Supported adapters: [gcs, localfs, s3]</expected-error>
+ <expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'. Supported adapters: [gcs, hdfs, localfs, s3]</expected-error>
<expected-error>ASX1189: Unsupported writing format 'avro'. Supported formats: [csv, json, parquet]</expected-error>
<expected-error>ASX1202: Unsupported compression scheme rar. Supported schemes for json are [gzip]</expected-error>
</compilation-unit>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
index 9e39211..d14746a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
@@ -66,4 +66,31 @@
</test-case>
-->
</test-group>
+ <test-group name="copy-to">
+ <test-case FilePath="copy-to-hdfs">
+ <compilation-unit name="parquet-simple">
+ <output-dir compare="Text">parquet-simple</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to-hdfs">
+ <compilation-unit name="parquet-tweet">
+ <output-dir compare="Text">parquet-tweet</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to-hdfs">
+ <compilation-unit name="parquet-utf8">
+ <output-dir compare="Text">parquet-utf8</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to-hdfs">
+ <compilation-unit name="parquet-cover-data-types">
+ <output-dir compare="Text">parquet-cover-data-types</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="copy-to-hdfs">
+ <compilation-unit name="parquet-empty-array">
+ <output-dir compare="Text">parquet-empty-array</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 319a7eb..0cf6eb2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -313,6 +313,7 @@
INVALID_CSV_SCHEMA(1208),
MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
STORAGE_SIZE_NOT_APPLICABLE_TO_TYPE(1210),
+ COULD_NOT_CREATE_TOKENS(1211),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index bbe7582..adb26fe 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -315,6 +315,7 @@
1208 = Invalid Copy to CSV schema
1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
1210 = Retrieving storage size is not applicable to type: %1$s.
+1211 = Could not create delegation tokens
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index edd42f6..a27e325 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -23,10 +23,12 @@
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -51,6 +53,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -85,12 +89,25 @@
private String nodeName;
private Class recordReaderClazz;
private IExternalFilterEvaluatorFactory filterEvaluatorFactory;
+ private transient Credentials credentials;
+ private byte[] serializedCredentials;
+ private transient UserGroupInformation ugi;
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
JobConf hdfsConf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
+ credentials = HDFSUtils.configureHadoopAuthentication(configuration, hdfsConf);
+ try {
+ if (credentials != null) {
+ serializedCredentials = HDFSUtils.serialize(credentials);
+ ugi = UserGroupInformation.createRemoteUser(UUID.randomUUID().toString());
+ ugi.addCredentials(credentials);
+ }
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
configureHdfsConf(hdfsConf, configuration);
}
@@ -103,13 +120,20 @@
return HDFSUtils.configureHDFSJobConf(configuration);
}
- protected void configureHdfsConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
+ protected void configureHdfsConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException, HyracksDataException {
String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
try {
confFactory = new ConfFactory(conf);
clusterLocations = getPartitionConstraint();
int numPartitions = clusterLocations.getLocations().length;
- InputSplit[] configInputSplits = getInputSplits(conf, numPartitions);
+ InputSplit[] configInputSplits;
+ if (credentials != null) {
+ configInputSplits =
+ ugi.doAs((PrivilegedExceptionAction<InputSplit[]>) () -> getInputSplits(conf, numPartitions));
+ } else {
+ configInputSplits = getInputSplits(conf, numPartitions);
+ }
readSchedule = hdfsScheduler.getLocationConstraints(configInputSplits);
inputSplitsFactory = new InputSplitsFactory(configInputSplits);
read = new boolean[readSchedule.length];
@@ -125,6 +149,8 @@
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
this.recordClass = char[].class;
}
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
} catch (IOException e) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
} catch (Exception e) {
@@ -157,7 +183,7 @@
public AsterixInputStream createInputStream(IHyracksTaskContext ctx) throws HyracksDataException {
try {
restoreConfig(ctx);
- return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration);
+ return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, ugi);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -169,6 +195,16 @@
inputSplits = inputSplitsFactory.getSplits();
nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
configured = true;
+ try {
+ if (serializedCredentials != null) {
+ credentials = new Credentials();
+ HDFSUtils.deserialize(serializedCredentials, credentials);
+ ugi = UserGroupInformation.createRemoteUser(UUID.randomUUID().toString());
+ ugi.addCredentials(credentials);
+ }
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
}
}
@@ -237,7 +273,8 @@
*/
readerConf = confFactory.getConf();
}
- return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf, context);
+ return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf, context,
+ ugi);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -262,12 +299,12 @@
private static IRecordReader<?> createRecordReader(Map<String, String> configuration, boolean[] read,
InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
- IExternalDataRuntimeContext context) {
- if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT.trim())
+ IExternalDataRuntimeContext context, UserGroupInformation ugi) {
+ if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim()
.equals(ExternalDataConstants.INPUT_FORMAT_PARQUET)) {
- return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, context);
+ return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, context, ugi);
} else {
- return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf);
+ return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, ugi);
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java
index 11241f1..04c80d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.UserGroupInformation;
public abstract class AbstractHDFSRecordReader<K, V> implements IRecordReader<V> {
protected RecordReader<K, V> reader;
@@ -43,9 +44,11 @@
protected JobConf conf;
protected IRawRecord<V> record;
private boolean firstInputSplit;
+ protected UserGroupInformation ugi;
public AbstractHDFSRecordReader(boolean[] read, InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf) {
+ JobConf conf, UserGroupInformation ugi) {
+ this.ugi = ugi;
this.read = read;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
@@ -58,7 +61,8 @@
}
public AbstractHDFSRecordReader(boolean[] read, InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- IRawRecord<V> record, JobConf conf) {
+ IRawRecord<V> record, JobConf conf, UserGroupInformation ugi) {
+ this.ugi = ugi;
this.read = read;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
index 48b99d1..6d72fb5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -19,24 +19,36 @@
package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class HDFSRecordReader<K, V extends Writable> extends AbstractHDFSRecordReader<K, V> {
public HDFSRecordReader(boolean[] read, InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf) {
- super(read, inputSplits, readSchedule, nodeName, conf);
+ JobConf conf, UserGroupInformation ugi) {
+ super(read, inputSplits, readSchedule, nodeName, conf, ugi);
}
@SuppressWarnings("unchecked")
@Override
protected RecordReader<K, V> getRecordReader(int splitIndex) throws IOException {
- reader = (RecordReader<K, V>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ if (ugi != null) {
+ try {
+ reader = ugi.doAs((PrivilegedExceptionAction<RecordReader<K, V>>) () -> (RecordReader<K, V>) inputFormat
+ .getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL));
+ } catch (InterruptedException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ } else {
+ reader = (RecordReader<K, V>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ }
if (key == null) {
key = reader.createKey();
value = reader.createValue();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
index bb74cd6..a1f86a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.hdfs.parquet;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.input.record.ValueReferenceRecord;
@@ -28,6 +29,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -39,8 +42,8 @@
private final IWarningCollector warningCollector;
public ParquetFileRecordReader(boolean[] read, InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, IExternalDataRuntimeContext context) {
- super(read, inputSplits, readSchedule, nodeName, new ValueReferenceRecord<>(), conf);
+ JobConf conf, IExternalDataRuntimeContext context, UserGroupInformation ugi) {
+ super(read, inputSplits, readSchedule, nodeName, new ValueReferenceRecord<>(), conf, ugi);
this.warningCollector = context.getTaskContext().getWarningCollector();
((MapredParquetInputFormat) inputFormat).setValueEmbedder(context.getValueEmbedder());
}
@@ -61,11 +64,20 @@
@Override
protected RecordReader<Void, V> getRecordReader(int splitIndex) throws IOException {
try {
- ParquetRecordReaderWrapper readerWrapper = (ParquetRecordReaderWrapper) inputFormat
- .getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ ParquetRecordReaderWrapper readerWrapper;
+ if (ugi != null) {
+ readerWrapper = ugi
+ .doAs((PrivilegedExceptionAction<ParquetRecordReaderWrapper>) () -> (ParquetRecordReaderWrapper) inputFormat
+ .getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL));
+ } else {
+ readerWrapper = (ParquetRecordReaderWrapper) inputFormat.getRecordReader(inputSplits[splitIndex], conf,
+ Reporter.NULL);
+ }
reader = (RecordReader<Void, V>) readerWrapper;
} catch (AsterixParquetRuntimeException e) {
throw e.getHyracksDataException();
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
}
if (value == null) {
value = reader.createValue();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 46c2102..b81d815 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.stream;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -31,6 +32,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class HDFSInputStream extends AsterixInputStream {
@@ -45,10 +48,13 @@
private String nodeName;
private JobConf conf;
private int pos = 0;
+ private UserGroupInformation ugi;
@SuppressWarnings("unchecked")
public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration) throws IOException, AsterixException {
+ JobConf conf, Map<String, String> configuration, UserGroupInformation ugi)
+ throws IOException, AsterixException {
+ this.ugi = ugi;
this.read = read;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
@@ -164,7 +170,19 @@
@SuppressWarnings("unchecked")
private RecordReader<Object, Text> getRecordReader(int splitIndex) throws IOException {
- reader = (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ if (ugi != null) {
+ try {
+ reader = ugi
+ .doAs((PrivilegedExceptionAction<RecordReader<Object, Text>>) () -> (RecordReader<Object, Text>) inputFormat
+ .getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL));
+ } catch (InterruptedException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ } else {
+ reader = (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf,
+ Reporter.NULL);
+ }
+
if (key == null) {
key = reader.createKey();
value = reader.createValue();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1329a29..2bc65f2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -161,6 +161,7 @@
public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE = "AZUREDATALAKE";
public static final String KEY_ADAPTER_NAME_GCS = "GCS";
+ public static final String KEY_ADAPTER_NAME_HDFS = "HDFS";
/**
* HDFS class names
@@ -178,6 +179,29 @@
public static final String INPUT_FORMAT_TEXT = "text-input-format";
public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
public static final String INPUT_FORMAT_PARQUET = "parquet-input-format";
+
+ public static final String HDFS_BLOCKSIZE = "blocksize";
+ public static final String HDFS_REPLICATION = "replication";
+ public static final String HADOOP_AUTHENTICATION = "authentication";
+ public static final String KERBEROS_PROTOCOL = "kerberos";
+ public static final String KERBEROS_REALM = "realm";
+ public static final String KERBEROS_KDC = "kdc";
+ public static final String HDFS_USE_DATANODE_HOSTNAME = "use-datanode-hostname";
+ public static final String KERBEROS_PRINCIPAL = "principal";
+ public static final String KERBEROS_PASSWORD = "password";
+
+ public static final String KEY_HDFS_BLOCKSIZE = "dfs.blocksize";
+ public static final String KEY_HDFS_REPLICATION = "dfs.replication";
+ public static final String KEY_HADOOP_AUTHENTICATION = "hadoop.security.authentication";
+ public static final String KEY_KERBEROS_CONF = "java.security.krb5.conf";
+ public static final String KEY_NAMENODE_PRINCIPAL_PATTERN = "dfs.namenode.kerberos.principal.pattern";
+ public static final String KEY_HDFS_USE_DATANODE_HOSTNAME = "dfs.client.use.datanode.hostname";
+ public static final String KERBEROS_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule";
+ public static final String KERBEROS_CONFIG_REFRESH = "refreshKrb5Config";
+ public static final String KERBEROS_CONFIG_FILE_CONTENT =
+ "[libdefaults]\n\tdefault_realm = %1$s\n\n[realms]\n\t%1$s = {\n\t\tkdc = %2$s\n\t}";
+ public static final String[] KERBEROS_CONFIG_FILE_PATTERN = { "krb5", ".conf" };
+
/**
* Builtin streams
*/
@@ -358,7 +382,7 @@
static {
WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, FORMAT_PARQUET, FORMAT_CSV_LOWER_CASE);
WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
- KEY_ADAPTER_NAME_GCS.toLowerCase());
+ KEY_ADAPTER_NAME_GCS.toLowerCase(), KEY_ADAPTER_NAME_HDFS.toLowerCase());
TEXTUAL_WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
PARQUET_WRITER_SUPPORTED_COMPRESSION =
Set.of(KEY_COMPRESSION_GZIP, KEY_COMPRESSION_SNAPPY, KEY_COMPRESSION_ZSTD);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c362f75..6ba618d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -698,6 +698,9 @@
case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
validateProperties(configuration, srcLoc, collector);
break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
+ HDFSUtils.validateProperties(configuration, srcLoc, collector);
+ break;
default:
// Nothing needs to be done
break;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 6bc013a..6d004d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -18,8 +18,11 @@
*/
package org.apache.asterix.external.util;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -27,14 +30,29 @@
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.file.Files;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -45,22 +63,29 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
@@ -223,9 +248,151 @@
configuration.get(ExternalDataConstants.S3A_CHANGE_DETECTION_REQUIRED));
}
+ String useDatanodeHostname = configuration.get(ExternalDataConstants.HDFS_USE_DATANODE_HOSTNAME);
+ if (useDatanodeHostname != null) {
+ conf.set(ExternalDataConstants.KEY_HDFS_USE_DATANODE_HOSTNAME, useDatanodeHostname);
+ }
return conf;
}
+ public static Configuration configureHDFSwrite(Map<String, String> configuration) {
+ Configuration conf = new Configuration();
+ String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+ String blocksize = configuration.get(ExternalDataConstants.HDFS_BLOCKSIZE);
+ String replication = configuration.get(ExternalDataConstants.HDFS_REPLICATION);
+ String useDatanodeHostname = configuration.get(ExternalDataConstants.HDFS_USE_DATANODE_HOSTNAME);
+ if (url != null) {
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, url);
+ }
+ if (blocksize != null) {
+ conf.set(ExternalDataConstants.KEY_HDFS_BLOCKSIZE, blocksize);
+ }
+ if (replication != null) {
+ conf.set(ExternalDataConstants.KEY_HDFS_REPLICATION, replication);
+ }
+ if (useDatanodeHostname != null) {
+ conf.set(ExternalDataConstants.KEY_HDFS_USE_DATANODE_HOSTNAME, useDatanodeHostname);
+ }
+ return conf;
+ }
+
+ public synchronized static Credentials configureHadoopAuthentication(Map<String, String> configuration,
+ Configuration conf) throws AlgebricksException {
+ UserGroupInformation.reset();
+ if (Objects.equals(configuration.get(ExternalDataConstants.HADOOP_AUTHENTICATION),
+ ExternalDataConstants.KERBEROS_PROTOCOL)) {
+ conf.set(ExternalDataConstants.KEY_HADOOP_AUTHENTICATION, ExternalDataConstants.KERBEROS_PROTOCOL);
+ conf.set(ExternalDataConstants.KEY_NAMENODE_PRINCIPAL_PATTERN, "*");
+
+ String kerberosRealm = configuration.get(ExternalDataConstants.KERBEROS_REALM);
+ String kerberosKdc = configuration.get(ExternalDataConstants.KERBEROS_KDC);
+ String kerberosPrincipal = configuration.get(ExternalDataConstants.KERBEROS_PRINCIPAL);
+ String kerberosPassword = configuration.get(ExternalDataConstants.KERBEROS_PASSWORD);
+
+ javax.security.auth.login.Configuration config = new JaasConfiguration();
+ java.nio.file.Path krb5conf = null;
+ try {
+ krb5conf = createKerberosConf(kerberosRealm, kerberosKdc);
+ System.setProperty(ExternalDataConstants.KEY_KERBEROS_CONF, krb5conf.toString());
+
+ Subject subject = new Subject();
+ LoginContext loginContext =
+ new LoginContext("", subject, new callbackHandler(kerberosPrincipal, kerberosPassword), config);
+ loginContext.login();
+
+ // Hadoop libraries use rules to ensure that a principal is associated with a realm.
+ // The default realm is static and needs to be modified in order to work with a different realm.
+ KerberosName.resetDefaultRealm();
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation ugi = UserGroupInformation.getUGIFromSubject(subject);
+ Credentials credentials = new Credentials();
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ try (FileSystem fs = FileSystem.get(conf)) {
+ fs.addDelegationTokens(ugi.getUserName(), credentials);
+ }
+ return null;
+ });
+ loginContext.logout();
+ return credentials;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw CompilationException.create(ErrorCode.COULD_NOT_CREATE_TOKENS);
+ } catch (LoginException | IOException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ } finally {
+ System.clearProperty(ExternalDataConstants.KEY_KERBEROS_CONF);
+ UserGroupInformation.reset();
+ if (krb5conf != null) {
+ FileUtils.deleteQuietly(krb5conf.toFile());
+ }
+ }
+ }
+ return null;
+ }
+
+ private static java.nio.file.Path createKerberosConf(String kerberosRealm, String kerberosKdc)
+ throws AlgebricksException {
+ java.nio.file.Path krb5conf;
+ try {
+ krb5conf = Files.createTempFile(ExternalDataConstants.KERBEROS_CONFIG_FILE_PATTERN[0],
+ ExternalDataConstants.KERBEROS_CONFIG_FILE_PATTERN[1]);
+ Files.writeString(krb5conf,
+ String.format(ExternalDataConstants.KERBEROS_CONFIG_FILE_CONTENT, kerberosRealm, kerberosKdc));
+ } catch (IOException ex) {
+ throw AsterixException.create(ErrorCode.COULD_NOT_CREATE_FILE,
+ String.join("", ExternalDataConstants.KERBEROS_CONFIG_FILE_PATTERN));
+ }
+ return krb5conf;
+ }
+
+ private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ Map<String, Object> options =
+ Map.of(ExternalDataConstants.KERBEROS_CONFIG_REFRESH, ExternalDataConstants.TRUE);
+
+ return new AppConfigurationEntry[] { new AppConfigurationEntry(ExternalDataConstants.KERBEROS_LOGIN_MODULE,
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options) };
+ }
+ }
+
+ public static byte[] serialize(Writable writable) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(bos)) {
+ writable.write(oos);
+ return bos.toByteArray();
+ }
+ }
+
+ public static void deserialize(byte[] data, Writable writable) throws IOException {
+ try (DataInputStream din = new DataInputStream(new ByteArrayInputStream(data))) {
+ writable.readFields(din);
+ }
+ }
+
+ private static class callbackHandler implements CallbackHandler {
+ private final String principal;
+ private final String password;
+
+ public callbackHandler(String principal, String password) {
+ this.principal = principal;
+ this.password = password;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ ((NameCallback) callback).setName(principal);
+ } else if (callback instanceof PasswordCallback) {
+ ((PasswordCallback) callback).setPassword(password.toCharArray());
+ } else {
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+ }
+
private static void configureParquet(Map<String, String> configuration, JobConf conf) {
//Parquet configurations
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, ParquetReadSupport.class.getName());
@@ -356,4 +523,41 @@
public static boolean isEmpty(JobConf job) {
return job.get(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "").isEmpty();
}
+
+ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector) throws CompilationException {
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+ if (configuration.get(ExternalDataConstants.KEY_PATH) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_PATH);
+ }
+
+ if (Objects.equals(configuration.get(ExternalDataConstants.HADOOP_AUTHENTICATION),
+ ExternalDataConstants.KERBEROS_PROTOCOL)) {
+ String kerberosRealm = configuration.get(ExternalDataConstants.KERBEROS_REALM);
+ String kerberosKdc = configuration.get(ExternalDataConstants.KERBEROS_KDC);
+ String kerberosPrincipal = configuration.get(ExternalDataConstants.KERBEROS_PRINCIPAL);
+ String kerberosPassword = configuration.get(ExternalDataConstants.KERBEROS_PASSWORD);
+
+ if (kerberosRealm == null) {
+ throw CompilationException.create(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
+ ExternalDataConstants.KERBEROS_REALM, ExternalDataConstants.HADOOP_AUTHENTICATION);
+ }
+ if (kerberosKdc == null) {
+ throw CompilationException.create(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
+ ExternalDataConstants.KERBEROS_KDC, ExternalDataConstants.HADOOP_AUTHENTICATION);
+ }
+ if (kerberosPrincipal == null) {
+ throw CompilationException.create(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
+ ExternalDataConstants.KERBEROS_PRINCIPAL, ExternalDataConstants.HADOOP_AUTHENTICATION);
+ }
+ if (kerberosPassword == null) {
+ throw CompilationException.create(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
+ ExternalDataConstants.KERBEROS_PASSWORD, ExternalDataConstants.HADOOP_AUTHENTICATION);
+ }
+ }
+
+ validateIncludeExclude(configuration);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
new file mode 100644
index 0000000..18865d0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class HDFSExternalFileWriter implements IExternalFileWriter {
+
+ private final IExternalPrinter printer;
+ private final FileSystem fs;
+ private final boolean partitionedPath;
+ private final SourceLocation pathSourceLocation;
+ private FSDataOutputStream outputStream = null;
+
+ HDFSExternalFileWriter(IExternalPrinter printer, FileSystem fs, boolean partitionedPath,
+ SourceLocation pathSourceLocation) {
+ this.printer = printer;
+ this.fs = fs;
+ this.partitionedPath = partitionedPath;
+ this.pathSourceLocation = pathSourceLocation;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void validate(String directory) throws HyracksDataException {
+ if (partitionedPath) {
+ Path dirPath = new Path(directory);
+ try {
+ if (fs.exists(dirPath)) {
+ FileStatus fileStatus = fs.getFileStatus(dirPath);
+ if (fileStatus.isFile()) {
+ throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, directory);
+ }
+ if (fileStatus.isDirectory()) {
+ FileStatus[] fileStatuses = fs.listStatus(dirPath);
+ if (fileStatuses.length != 0) {
+ throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
+ directory);
+ }
+ }
+ }
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+ }
+
+ @Override
+ public boolean newFile(String directory, String fileName) throws HyracksDataException {
+ Path path = new Path(directory, fileName);
+ try {
+ outputStream = fs.create(path, false);
+ printer.newStream(outputStream);
+ } catch (FileAlreadyExistsException e) {
+ return false;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return true;
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ printer.print(value);
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ if (outputStream != null) {
+ outputStream.abort();
+ }
+ printer.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ printer.close();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..761131f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.ExceptionUtils;
+
+public class HDFSExternalFileWriterFactory implements IExternalFileWriterFactory {
+ private static final long serialVersionUID = 1L;
+ private static final char SEPARATOR = '/';
+ public static final IExternalFileWriterFactoryProvider PROVIDER = new IExternalFileWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+ return new HDFSExternalFileWriterFactory(configuration);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
+
+ private final Map<String, String> configuration;
+ private final String staticPath;
+ private final SourceLocation pathSourceLocation;
+ private transient Credentials credentials;
+ private byte[] serializedCredentials;
+ private transient FileSystem fs;
+
+ private HDFSExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+ configuration = externalConfig.getConfiguration();
+ staticPath = externalConfig.getStaticPath();
+ pathSourceLocation = externalConfig.getPathSourceLocation();
+ }
+
+ private FileSystem createFileSystem(Configuration conf) throws CompilationException, HyracksDataException {
+ try {
+ if (credentials != null) {
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(UUID.randomUUID().toString());
+ ugi.addCredentials(credentials);
+ return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(conf));
+ }
+ return FileSystem.get(conf);
+ } catch (InterruptedException ex) {
+ throw HyracksDataException.create(ex);
+ } catch (IOException ex) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ }
+ }
+
+ private void buildFileSystem() throws HyracksDataException {
+ try {
+ synchronized (this) {
+ if (credentials == null && serializedCredentials != null) {
+ credentials = new Credentials();
+ HDFSUtils.deserialize(serializedCredentials, credentials);
+ }
+ if (fs == null) {
+ Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
+ fs = createFileSystem(conf);
+ }
+ }
+ } catch (IOException | CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
+ throws HyracksDataException {
+ buildFileSystem();
+ IExternalPrinter printer = printerFactory.createPrinter();
+ return new HDFSExternalFileWriter(printer, fs, staticPath == null, pathSourceLocation);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+
+ @Override
+ public void validate() throws AlgebricksException {
+ Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
+ credentials = HDFSUtils.configureHadoopAuthentication(configuration, conf);
+ try {
+ if (credentials != null) {
+ serializedCredentials = HDFSUtils.serialize(credentials);
+ }
+ try (FileSystem testFs = createFileSystem(conf)) {
+ doValidate(testFs);
+ }
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex));
+ }
+ }
+
+ private void doValidate(FileSystem testFs) throws IOException, AlgebricksException {
+ if (staticPath != null) {
+ Path dirPath = new Path(staticPath);
+ if (testFs.exists(dirPath)) {
+ FileStatus fileStatus = testFs.getFileStatus(dirPath);
+ if (fileStatus.isFile()) {
+ throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+ }
+ if (fileStatus.isDirectory()) {
+ FileStatus[] fileStatuses = testFs.listStatus(dirPath);
+ if (fileStatuses.length != 0) {
+ throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
+ staticPath);
+ }
+ }
+ }
+ checkDirectoryWritePermission(testFs);
+ }
+ }
+
+ private void checkDirectoryWritePermission(FileSystem fs) throws AlgebricksException {
+ if (!Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION,
+ Boolean.TRUE.toString()))) {
+ return;
+ }
+ Path path = new Path(staticPath, "testFile");
+ try {
+ FSDataOutputStream outputStream = fs.create(path);
+ fs.deleteOnExit(path);
+ outputStream.write(0);
+ outputStream.close();
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
+ }
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index f58df7e..1caea58 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.writer.HDFSExternalFileWriterFactory;
import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
import org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -77,6 +78,7 @@
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, GCSExternalFileWriterFactory.PROVIDER);
+ addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS, HDFSExternalFileWriterFactory.PROVIDER);
}
private static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 94bd6e1..62bec6b 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -96,7 +96,7 @@
<log4j.version>2.22.1</log4j.version>
<awsjavasdk.version>2.24.9</awsjavasdk.version>
<awsjavasdk.crt.version>0.29.10</awsjavasdk.crt.version>
- <parquet.version>1.14.1</parquet.version>
+ <parquet.version>1.14.3</parquet.version>
<hadoop-awsjavasdk.version>1.12.637</hadoop-awsjavasdk.version>
<azureblobjavasdk.version>12.25.1</azureblobjavasdk.version>
<azurecommonjavasdk.version>12.24.1</azurecommonjavasdk.version>