[ASTERIXDB-3255][RT] Embed filter values in Parquet
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Add the support to emved filter values when
assembling Parquet's columns.
Change-Id: I537133639a464ab46f60e2aa27ae68ab203c1989
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17776
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/Jones/0.json b/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/Jones/0.json
new file mode 100644
index 0000000..3cec0e4
--- /dev/null
+++ b/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/Jones/0.json
@@ -0,0 +1,3 @@
+{"id": 2, "department": "accounting", "name": {"first": "Mike", "last": "Jones"}}
+{"id": 5, "department": "engineering", "name": {"first": "Alice"}}
+{"id": 8, "department": "hr", "name": null}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/miller/0.json b/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/miller/0.json
new file mode 100644
index 0000000..c587039
--- /dev/null
+++ b/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/miller/0.json
@@ -0,0 +1,3 @@
+{"id": 3, "department": "accounting", "name": {"first": "Alex", "last": "Miller"}}
+{"id": 6, "department": "engineering", "name": {}}
+{"id": 9, "department": "hr"}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/smith/0.json b/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/smith/0.json
new file mode 100644
index 0000000..972c531
--- /dev/null
+++ b/asterixdb/asterix-app/data/json/external-filter/embed/last-name-parquet/smith/0.json
@@ -0,0 +1,3 @@
+{"id": 1, "department": "accounting", "name": {"first": "John", "last": "Smith"}}
+{"id": 4, "department": "engineering", "name": {"first": "Tom", "last": "Smith"}}
+{"id": 7, "department": "hr", "name": {"first": "James", "last": "Smith"}}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/embed-multiple-values/embed-multiple-values.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/embed-multiple-values/embed-multiple-values.001.query.sqlpp
index 2907baa..e9df77e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/embed-multiple-values/embed-multiple-values.001.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/embed-multiple-values/embed-multiple-values.001.query.sqlpp
@@ -19,7 +19,7 @@
USE test;
-SELECT value t
+SELECT VALUE t
FROM maintenance t
ORDER BY t.customer_id,
t.company,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.000.ddl.sqlpp
new file mode 100644
index 0000000..c1fc940
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.000.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE test AS {
+};
+
+CREATE EXTERNAL DATASET maintenance(test) USING %adapter% (
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/external-filter/embed/car/{company:string}/customer/{customer_id:int}/maintenance-report/year-{year:int}-month-{month:int}-day-{day:int}-date"),
+ ("embed-filter-values" = "true"),
+ ("format"="parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.001.query.sqlpp
new file mode 100644
index 0000000..e9df77e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.001.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE t
+FROM maintenance t
+ORDER BY t.customer_id,
+ t.company,
+ t.year,
+ t.month,
+ t.day
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.002.query.sqlpp
new file mode 100644
index 0000000..3c99576
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.002.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE t
+FROM maintenance t
+WHERE t.company = "ford"
+ AND t.customer_id = 1
+ AND t.year = 2001
+ AND t.month = 01
+ AND t.day = 01;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.000.ddl.sqlpp
new file mode 100644
index 0000000..014bb6c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.000.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE TestType AS {
+};
+
+
+CREATE EXTERNAL DATASET Department(TestType) USING %adapter% (
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/external-filter/embed/department/{department:string}"),
+ ("embed-filter-values" = "true"),
+ ("format"="parquet")
+);
+
+CREATE EXTERNAL DATASET LastName(TestType) USING %adapter% (
+ %template%,
+ ("container"="playground"),
+ ("definition"="parquet-data/external-filter/embed/last-name-parquet/{name.last:string}"),
+ ("embed-filter-values" = "true"),
+ ("format"="parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.010.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.010.query.sqlpp
new file mode 100644
index 0000000..0034525
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.010.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM Department d
+WHERE d.department = "accounting"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.011.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.011.query.sqlpp
new file mode 100644
index 0000000..b5856ac
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.011.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM Department d
+WHERE d.department = "accounting"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.020.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.020.query.sqlpp
new file mode 100644
index 0000000..a0919a7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.020.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM Department d
+WHERE d.department = "accounting"
+ AND d.name.last = "Smith"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.021.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.021.query.sqlpp
new file mode 100644
index 0000000..3eb867c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.021.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM Department d
+WHERE d.department = "accounting"
+ AND d.name.last = "Smith"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.030.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.030.query.sqlpp
new file mode 100644
index 0000000..c24bf2c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.030.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM Department d
+WHERE d.department = "accounting"
+ OR d.name.last = "Smith"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.031.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.031.query.sqlpp
new file mode 100644
index 0000000..e1c0b02
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.031.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM Department d
+WHERE d.department = "accounting"
+ OR d.name.last = "Smith"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.040.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.040.query.sqlpp
new file mode 100644
index 0000000..e3e6ee5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.040.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM Department d
+WHERE uppercase(d.department) = "H" || "R"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.041.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.041.query.sqlpp
new file mode 100644
index 0000000..bfa65b5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.041.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM Department d
+WHERE uppercase(d.department) = "H" || "R"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.110.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.110.query.sqlpp
new file mode 100644
index 0000000..349ba83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.110.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM LastName d
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.111.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.111.query.sqlpp
new file mode 100644
index 0000000..1beac09
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.111.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM LastName d
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.120.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.120.query.sqlpp
new file mode 100644
index 0000000..7ae11e6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.120.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM LastName d
+WHERE d.name.last = "Jones"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.121.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.121.query.sqlpp
new file mode 100644
index 0000000..e26b35a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.121.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM LastName d
+WHERE d.name.last = "Jones"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.130.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.130.query.sqlpp
new file mode 100644
index 0000000..c4d575d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.130.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE d
+FROM LastName d
+WHERE lowercase(d.name.first) = "john"
+ AND "john " || lowercase(d.name.last) = "john smith"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.131.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.131.query.sqlpp
new file mode 100644
index 0000000..63dbd7c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/embed-one-value.131.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE d
+FROM LastName d
+WHERE lowercase(d.name.first) = "john"
+ AND "john " || lowercase(d.name.last) = "john smith"
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.001.adm
new file mode 100644
index 0000000..1fcdcfa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.001.adm
@@ -0,0 +1,81 @@
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 1, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 1, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 2, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 2, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 2, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "ford" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "ford" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "lexus" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "lexus" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "lexus" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 3, "year": 2001, "month": 1, "day": 1, "company": "toyota" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 3, "year": 2002, "month": 2, "day": 2, "company": "toyota" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
+{ "customer_id": 3, "year": 2003, "month": 3, "day": 3, "company": "toyota" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.002.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.002.adm
new file mode 100644
index 0000000..eff14d1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-multiple-values/embed-multiple-values.002.adm
@@ -0,0 +1,3 @@
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "ford" }
+{ "customer_id": 1, "year": 2001, "month": 1, "day": 1, "company": "ford" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.010.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.010.adm
new file mode 100644
index 0000000..7cb9062
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.010.adm
@@ -0,0 +1,3 @@
+{ "id": 1, "name": { "first": "John", "last": "Smith" }, "department": "accounting" }
+{ "id": 2, "name": { "first": "Mike", "last": "Jones" }, "department": "accounting" }
+{ "id": 3, "name": { "first": "Alex", "last": "Miller" }, "department": "accounting" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
new file mode 100644
index 0000000..b560a5b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
@@ -0,0 +1,24 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$17(ASC) ] |PARTITIONED|
+ order (ASC, $$17) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$17(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (eq($$d.getField("department"), "accounting")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |PARTITIONED|
+ assign [$$17] <- [$$d.getField("id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.020.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.020.adm
new file mode 100644
index 0000000..8bfbbb3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.020.adm
@@ -0,0 +1 @@
+{ "id": 1, "name": { "first": "John", "last": "Smith" }, "department": "accounting" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
new file mode 100644
index 0000000..2bfad06
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
@@ -0,0 +1,24 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$22(ASC) ] |PARTITIONED|
+ order (ASC, $$22) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$22(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |PARTITIONED|
+ assign [$$22] <- [$$d.getField("id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.030.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.030.adm
new file mode 100644
index 0000000..7d601d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.030.adm
@@ -0,0 +1,5 @@
+{ "id": 1, "name": { "first": "John", "last": "Smith" }, "department": "accounting" }
+{ "id": 2, "name": { "first": "Mike", "last": "Jones" }, "department": "accounting" }
+{ "id": 3, "name": { "first": "Alex", "last": "Miller" }, "department": "accounting" }
+{ "id": 4, "name": { "first": "Tom", "last": "Smith" }, "department": "engineering" }
+{ "id": 7, "name": { "first": "James", "last": "Smith" }, "department": "hr" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
new file mode 100644
index 0000000..afb74f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
@@ -0,0 +1,24 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$22(ASC) ] |PARTITIONED|
+ order (ASC, $$22) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$22(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |PARTITIONED|
+ assign [$$22] <- [$$d.getField("id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.Department embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.040.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.040.adm
new file mode 100644
index 0000000..3bfb1ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.040.adm
@@ -0,0 +1,3 @@
+{ "id": 7, "name": { "first": "James", "last": "Smith" }, "department": "hr" }
+{ "id": 8, "name": { "first": "David", "last": "Jones" }, "department": "hr" }
+{ "id": 9, "name": { "first": "Noah", "last": "Miller" }, "department": "hr" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.041.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.041.plan
new file mode 100644
index 0000000..6f2ecf3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.041.plan
@@ -0,0 +1,24 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$20(ASC) ] |PARTITIONED|
+ order (ASC, $$20) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$20(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (eq(uppercase($$d.getField("department")), "HR")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |PARTITIONED|
+ assign [$$20] <- [$$d.getField("id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq(uppercase($$d.getField("department")), "HR") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.110.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.110.adm
new file mode 100644
index 0000000..de64450
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.110.adm
@@ -0,0 +1,9 @@
+{ "id": 1, "department": "accounting", "name": { "first": "John", "last": "smith" } }
+{ "id": 2, "department": "accounting", "name": { "first": "Mike", "last": "Jones" } }
+{ "id": 3, "department": "accounting", "name": { "first": "Alex", "last": "miller" } }
+{ "id": 4, "department": "engineering", "name": { "first": "Tom", "last": "smith" } }
+{ "id": 5, "department": "engineering", "name": { "first": "Alice", "last": "Jones" } }
+{ "id": 6, "department": "engineering", "name": { "last": "miller" } }
+{ "id": 7, "department": "hr", "name": { "first": "James", "last": "smith" } }
+{ "id": 8, "department": "hr", "name": { "last": "Jones" } }
+{ "id": 9, "department": "hr", "name": { "last": "miller" } }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.111.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.111.plan
new file mode 100644
index 0000000..ff27bfe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.111.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$14(ASC) ] |PARTITIONED|
+ order (ASC, $$14) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$14(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ assign [$$14] <- [$$d.getField("id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.LastName embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.120.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.120.adm
new file mode 100644
index 0000000..6d3c45f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.120.adm
@@ -0,0 +1,3 @@
+{ "id": 2, "department": "accounting", "name": { "first": "Mike", "last": "Jones" } }
+{ "id": 5, "department": "engineering", "name": { "first": "Alice", "last": "Jones" } }
+{ "id": 8, "department": "hr", "name": { "last": "Jones" } }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
new file mode 100644
index 0000000..fc48056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
@@ -0,0 +1,24 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$18(ASC) ] |PARTITIONED|
+ order (ASC, $$18) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$18(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (eq($$d.getField("name").getField("last"), "Jones")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |PARTITIONED|
+ assign [$$18] <- [$$d.getField("id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.130.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.130.adm
new file mode 100644
index 0000000..1060822
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.130.adm
@@ -0,0 +1 @@
+{ "id": 1, "department": "accounting", "name": { "first": "John", "last": "smith" } }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.131.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.131.plan
new file mode 100644
index 0000000..4edf24e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.131.plan
@@ -0,0 +1,26 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SORT_MERGE_EXCHANGE [$$28(ASC) ] |PARTITIONED|
+ order (ASC, $$28) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STABLE_SORT [$$28(ASC)] |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$d, $$28]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ select (and(eq(lowercase($$25.getField("first")), "john"), eq(string-concat(ordered-list-constructor("john ", lowercase($$25.getField("last")))), "john smith"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |PARTITIONED|
+ assign [$$28, $$25] <- [$$d.getField("id"), $$d.getField("name")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$d] <- test.LastName prefix-filter on: eq(string-concat(ordered-list-constructor("john ", lowercase($$d.getField("name").getField("last")))), "john smith") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 132a905..02353f7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -278,6 +278,18 @@
<output-dir compare="Text">query</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset/common/dynamic-prefixes/parquet">
+ <compilation-unit name="embed-one-value">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">embed-one-value</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset/common/dynamic-prefixes/parquet">
+ <compilation-unit name="embed-multiple-values">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">embed-multiple-values</output-dir>
+ </compilation-unit>
+ </test-case>
<!-- Dynamic prefixes tests end -->
<test-case FilePath="external-dataset">
<compilation-unit name="common/empty-string-definition">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 46afdbe..edd42f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -265,8 +265,7 @@
IExternalDataRuntimeContext context) {
if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT.trim())
.equals(ExternalDataConstants.INPUT_FORMAT_PARQUET)) {
- IWarningCollector warningCollector = context.getTaskContext().getWarningCollector();
- return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, warningCollector);
+ return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, context);
} else {
return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index d3ad968..7b10e09 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -25,22 +25,16 @@
import java.io.IOException;
import java.util.List;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.parquet.hadoop.Footer;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetInputSplit;
-import org.apache.parquet.hadoop.ParquetRecordReader;
/**
* For the Original implementation, see {@code DeprecatedParquetInputFormat}
@@ -53,12 +47,13 @@
*/
public class MapredParquetInputFormat extends org.apache.hadoop.mapred.FileInputFormat<Void, VoidPointable> {
- protected ParquetInputFormat<ArrayBackedValueStorage> realInputFormat = new ParquetInputFormat<>();
+ private final ParquetInputFormat<ArrayBackedValueStorage> realInputFormat = new ParquetInputFormat<>();
+ private IExternalFilterValueEmbedder valueEmbedder;
@Override
public RecordReader<Void, VoidPointable> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
throws IOException {
- return new RecordReaderWrapper(split, job, reporter);
+ return new ParquetRecordReaderWrapper(split, job, reporter, valueEmbedder);
}
@Override
@@ -84,130 +79,15 @@
return realInputFormat.getFooters(job, asList(super.listStatus(job)));
}
- private static class RecordReaderWrapper implements RecordReader<Void, VoidPointable> {
-
- private final ParquetRecordReader<IValueReference> realReader;
- private final long splitLen; // for getPos()
-
- private final VoidPointable valueContainer;
-
- private boolean firstRecord;
- private boolean eof;
-
- public RecordReaderWrapper(InputSplit oldSplit, JobConf oldJobConf, Reporter reporter) throws IOException {
- splitLen = oldSplit.getLength();
-
- try {
- realReader = new ParquetRecordReader<>(
- ParquetInputFormat.<IValueReference> getReadSupportInstance(oldJobConf),
- ParquetInputFormat.getFilter(oldJobConf));
-
- if (oldSplit instanceof ParquetInputSplitWrapper) {
- realReader.initialize(((ParquetInputSplitWrapper) oldSplit).realSplit, oldJobConf, reporter);
- } else if (oldSplit instanceof FileSplit) {
- realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
- } else {
- throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
- LogRedactionUtil.userData(oldSplit.toString()), "invalid file split");
- }
- valueContainer = new VoidPointable();
- firstRecord = false;
- eof = false;
- // read once to gain access to key and value objects
- if (realReader.nextKeyValue()) {
- firstRecord = true;
- valueContainer.set(realReader.getCurrentValue());
-
- } else {
- eof = true;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (HyracksDataException | AsterixParquetRuntimeException e) {
- throw e;
- } catch (Exception e) {
- if (e.getMessage() != null && e.getMessage().contains("not a Parquet file")) {
- throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
- LogRedactionUtil.userData(getPath(oldSplit)), "not a Parquet file");
- }
-
- throw RuntimeDataException.create(ErrorCode.UNEXPECTED_ERROR_ENCOUNTERED,
- LogRedactionUtil.userData(e.toString()));
- }
- }
-
- private String getPath(InputSplit split) {
- if (split instanceof FileSplit) {
- return ((FileSplit) split).getPath().toString();
- } else if (split instanceof ParquetInputSplitWrapper) {
- return ((ParquetInputSplitWrapper) split).realSplit.getPath().toString();
- } else {
- return split.toString();
- }
- }
-
- @Override
- public void close() throws IOException {
- realReader.close();
- }
-
- @Override
- public Void createKey() {
- return null;
- }
-
- @Override
- public VoidPointable createValue() {
- return valueContainer;
- }
-
- @Override
- public long getPos() throws IOException {
- return (long) (splitLen * getProgress());
- }
-
- @Override
- public float getProgress() throws IOException {
- try {
- return realReader.getProgress();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean next(Void key, VoidPointable value) throws IOException {
- if (eof) {
- return false;
- }
-
- if (firstRecord) { // key & value are already read.
- firstRecord = false;
- value.set(valueContainer);
- return true;
- }
-
- try {
- if (realReader.nextKeyValue()) {
- if (value != null) {
- value.set(realReader.getCurrentValue());
- }
- return true;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
-
- eof = true; // strictly not required, just for consistency
- return false;
- }
+ public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
+ this.valueEmbedder = valueEmbedder;
}
public static boolean isTaskSideMetaData(JobConf job) {
return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
}
- private static class ParquetInputSplitWrapper implements InputSplit {
+ static class ParquetInputSplitWrapper implements InputSplit {
ParquetInputSplit realSplit;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
index 9c1d70a..bb74cd6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
@@ -20,6 +20,7 @@
import java.io.IOException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.input.record.ValueReferenceRecord;
import org.apache.asterix.external.input.record.reader.hdfs.AbstractHDFSRecordReader;
import org.apache.asterix.external.util.HDFSUtils;
@@ -38,9 +39,10 @@
private final IWarningCollector warningCollector;
public ParquetFileRecordReader(boolean[] read, InputSplit[] inputSplits, String[] readSchedule, String nodeName,
- JobConf conf, IWarningCollector warningCollector) {
+ JobConf conf, IExternalDataRuntimeContext context) {
super(read, inputSplits, readSchedule, nodeName, new ValueReferenceRecord<>(), conf);
- this.warningCollector = warningCollector;
+ this.warningCollector = context.getTaskContext().getWarningCollector();
+ ((MapredParquetInputFormat) inputFormat).setValueEmbedder(context.getValueEmbedder());
}
@Override
@@ -59,7 +61,9 @@
@Override
protected RecordReader<Void, V> getRecordReader(int splitIndex) throws IOException {
try {
- reader = (RecordReader<Void, V>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ ParquetRecordReaderWrapper readerWrapper = (ParquetRecordReaderWrapper) inputFormat
+ .getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
+ reader = (RecordReader<Void, V>) readerWrapper;
} catch (AsterixParquetRuntimeException e) {
throw e.getHyracksDataException();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
index 797a2b2..c8a09b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.RootConverter;
import org.apache.asterix.external.util.HDFSUtils;
@@ -38,6 +39,8 @@
import org.apache.parquet.schema.MessageType;
public class ParquetReadSupport extends ReadSupport<IValueReference> {
+ private IExternalFilterValueEmbedder valueEmbedder;
+
@Override
public ReadContext init(InitContext context) {
MessageType requestedSchema = getRequestedSchema(context);
@@ -47,7 +50,12 @@
@Override
public RecordMaterializer<IValueReference> prepareForRead(Configuration configuration,
Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
- return new ADMRecordMaterializer(configuration, readContext);
+ try {
+ return new ADMRecordMaterializer(configuration, valueEmbedder, readContext);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+
}
private static MessageType getRequestedSchema(InitContext initContext) {
@@ -75,14 +83,19 @@
}
}
+ void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
+ this.valueEmbedder = valueEmbedder;
+ }
+
private static class ADMRecordMaterializer extends RecordMaterializer<IValueReference> {
private final RootConverter rootConverter;
private final List<Warning> warnings;
private final Configuration configuration;
- public ADMRecordMaterializer(Configuration configuration, ReadContext readContext) {
+ public ADMRecordMaterializer(Configuration configuration, IExternalFilterValueEmbedder valueEmbedder,
+ ReadContext readContext) throws IOException {
warnings = new ArrayList<>();
- rootConverter = new RootConverter(readContext.getRequestedSchema(), configuration, warnings);
+ rootConverter = new RootConverter(readContext.getRequestedSchema(), valueEmbedder, configuration, warnings);
this.configuration = configuration;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
new file mode 100644
index 0000000..a293ebb
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs.parquet;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+
+public class ParquetRecordReaderWrapper implements RecordReader<Void, VoidPointable> {
+ private final ParquetRecordReader<IValueReference> realReader;
+ private final long splitLen; // for getPos()
+
+ private final VoidPointable valueContainer;
+
+ private boolean firstRecord;
+ private boolean eof;
+
+ public ParquetRecordReaderWrapper(InputSplit oldSplit, JobConf oldJobConf, Reporter reporter,
+ IExternalFilterValueEmbedder valueEmbedder) throws IOException {
+ splitLen = oldSplit.getLength();
+
+ try {
+ ReadSupport<IValueReference> readSupport = ParquetInputFormat.getReadSupportInstance(oldJobConf);
+ ParquetReadSupport parquetReadSupport = (ParquetReadSupport) readSupport;
+ parquetReadSupport.setValueEmbedder(valueEmbedder);
+ realReader = new ParquetRecordReader<>(readSupport, ParquetInputFormat.getFilter(oldJobConf));
+
+ if (oldSplit instanceof MapredParquetInputFormat.ParquetInputSplitWrapper) {
+ realReader.initialize(((MapredParquetInputFormat.ParquetInputSplitWrapper) oldSplit).realSplit,
+ oldJobConf, reporter);
+ } else if (oldSplit instanceof FileSplit) {
+ realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
+ } else {
+ throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+ LogRedactionUtil.userData(oldSplit.toString()), "invalid file split");
+ }
+
+ // Set the path for value embedder
+ valueEmbedder.setPath(getPath(oldSplit));
+
+ valueContainer = new VoidPointable();
+ firstRecord = false;
+ eof = false;
+ // read once to gain access to key and value objects
+ if (realReader.nextKeyValue()) {
+ firstRecord = true;
+ valueContainer.set(realReader.getCurrentValue());
+ } else {
+ eof = true;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (HyracksDataException | AsterixParquetRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ if (e.getMessage() != null && e.getMessage().contains("not a Parquet file")) {
+ throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+ LogRedactionUtil.userData(getPath(oldSplit)), "not a Parquet file");
+ }
+
+ throw RuntimeDataException.create(e);
+ }
+ }
+
+ private String getPath(InputSplit split) {
+ if (split instanceof FileSplit) {
+ return ((FileSplit) split).getPath().toString();
+ } else if (split instanceof MapredParquetInputFormat.ParquetInputSplitWrapper) {
+ return ((MapredParquetInputFormat.ParquetInputSplitWrapper) split).realSplit.getPath().toString();
+ } else {
+ return split.toString();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ realReader.close();
+ }
+
+ @Override
+ public Void createKey() {
+ return null;
+ }
+
+ @Override
+ public VoidPointable createValue() {
+ return valueContainer;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return (long) (splitLen * getProgress());
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ try {
+ return realReader.getProgress();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean next(Void key, VoidPointable value) throws IOException {
+ if (eof) {
+ return false;
+ }
+
+ if (firstRecord) { // key & value are already read.
+ firstRecord = false;
+ value.set(valueContainer);
+ return true;
+ }
+
+ try {
+ if (realReader.nextKeyValue()) {
+ if (value != null) {
+ value.set(realReader.getCurrentValue());
+ }
+ return true;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ eof = true; // strictly not required, just for consistency
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java
index c0ee37b..c967130 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/IFieldValue.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.data.std.api.IValueReference;
/**
@@ -25,8 +26,12 @@
* index of associated with a value.
*/
public interface IFieldValue {
+ ATypeTag getTypeTag();
+
IValueReference getFieldName();
+ String getStringFieldName();
+
/**
* @return the index of the value as appeared in the schema
*/
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
index 4982ca5..5fa46a3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
@@ -25,6 +25,8 @@
import org.apache.asterix.dataflow.data.nontagged.serde.ABinarySerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
import org.apache.asterix.external.parser.jackson.ParserContext;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -47,6 +49,7 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
import org.apache.hyracks.util.string.UTF8StringReader;
import org.apache.hyracks.util.string.UTF8StringWriter;
@@ -119,8 +122,21 @@
private final String timeZoneId;
private final int timeZoneOffset;
+ /*
+ * ************************************************************************
+ * Value Embedder
+ * ************************************************************************
+ */
+ private final IExternalFilterValueEmbedder valueEmbedder;
+
public ParquetConverterContext(Configuration configuration, List<Warning> warnings) {
+ this(configuration, NoOpFilterValueEmbedder.INSTANCE, warnings);
+ }
+
+ public ParquetConverterContext(Configuration configuration, IExternalFilterValueEmbedder valueEmbedder,
+ List<Warning> warnings) {
this.warnings = warnings;
+ this.valueEmbedder = valueEmbedder;
modifiedUTF8DataOutput = new StandardUTF8ToModifiedUTF8DataOutput(
new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader()));
@@ -137,6 +153,10 @@
}
}
+ public IExternalFilterValueEmbedder getValueEmbedder() {
+ return valueEmbedder;
+ }
+
public List<Warning> getWarnings() {
return warnings;
}
@@ -165,6 +185,16 @@
* ************************************************************************
*/
+ @Override
+ public IMutableValueStorage getSerializedFieldName(String fieldName) throws IOException {
+ if (fieldName == null) {
+ // Could happen in the context of Parquet's converters (i.e., in array item converter)
+ return null;
+ }
+
+ return super.getSerializedFieldName(fieldName);
+ }
+
public void serializeBoolean(boolean value, DataOutput output) {
try {
booleanSerDer.serialize(value ? ABoolean.TRUE : ABoolean.FALSE, output);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
index e6b80d0..24ec4c9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.DataOutput;
+import java.io.IOException;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
@@ -35,20 +36,22 @@
public abstract class AbstractComplexConverter extends GroupConverter implements IFieldValue {
protected final AbstractComplexConverter parent;
private final IValueReference fieldName;
+ private final String stringFieldName;
private final int index;
private final Converter[] converters;
protected final ParquetConverterContext context;
protected IMutableValueStorage tempStorage;
AbstractComplexConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
- ParquetConverterContext context) {
+ ParquetConverterContext context) throws IOException {
this(parent, null, index, parquetType, context);
}
- AbstractComplexConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- GroupType parquetType, ParquetConverterContext context) {
+ AbstractComplexConverter(AbstractComplexConverter parent, String stringFieldName, int index, GroupType parquetType,
+ ParquetConverterContext context) throws IOException {
this.parent = parent;
- this.fieldName = fieldName;
+ this.stringFieldName = stringFieldName;
+ this.fieldName = context.getSerializedFieldName(stringFieldName);
this.index = index;
this.context = context;
converters = new Converter[parquetType.getFieldCount()];
@@ -117,7 +120,7 @@
*
* @formatter:on
*/
- protected AbstractComplexConverter createRepeatedConverter(GroupType type, int index) {
+ protected AbstractComplexConverter createRepeatedConverter(GroupType type, int index) throws IOException {
GroupType repeatedType = type.getType(index).asGroupType();
String name = repeatedType.getName();
if (repeatedType.getFieldCount() > 1 || "array".equals(name) || "key_value".equals(name)) {
@@ -129,6 +132,11 @@
}
@Override
+ public String getStringFieldName() {
+ return stringFieldName;
+ }
+
+ @Override
public IValueReference getFieldName() {
return fieldName;
}
@@ -148,7 +156,7 @@
return tempStorage.getDataOutput();
}
- protected IMutableValueStorage getValue() {
+ public IMutableValueStorage getValue() {
return tempStorage;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
index 7eacc87..89647e0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
@@ -25,8 +25,8 @@
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
@@ -35,13 +35,13 @@
private IAsterixListBuilder builder;
public ArrayConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
- ParquetConverterContext context) {
+ ParquetConverterContext context) throws IOException {
super(parent, index, parquetType, context);
}
- public ArrayConverter(AbstractComplexConverter parent, IValueReference fieldName, int index, GroupType parquetType,
- ParquetConverterContext context) {
- super(parent, fieldName, index, parquetType, context);
+ public ArrayConverter(AbstractComplexConverter parent, String stringFieldName, int index, GroupType parquetType,
+ ParquetConverterContext context) throws IOException {
+ super(parent, stringFieldName, index, parquetType, context);
}
@Override
@@ -64,6 +64,11 @@
}
@Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.ARRAY;
+ }
+
+ @Override
public void addValue(IFieldValue value) {
try {
builder.addItem(tempStorage);
@@ -74,19 +79,31 @@
@Override
protected PrimitiveConverter createAtomicConverter(GroupType type, int index) {
- PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
- return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, index, context);
+ try {
+ PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
+ return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, index, context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
@Override
protected ArrayConverter createArrayConverter(GroupType type, int index) {
- final GroupType arrayType = type.getType(index).asGroupType();
- return new ArrayConverter(this, index, arrayType, context);
+ try {
+ GroupType arrayType = type.getType(index).asGroupType();
+ return new ArrayConverter(this, index, arrayType, context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
@Override
protected ObjectConverter createObjectConverter(GroupType type, int index) {
- final GroupType objectType = type.getType(index).asGroupType();
- return new ObjectConverter(this, index, objectType, context);
+ try {
+ GroupType objectType = type.getType(index).asGroupType();
+ return new ObjectConverter(this, index, objectType, context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
index 3c8bfcc..542318b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
@@ -21,52 +21,81 @@
import java.io.IOException;
import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
class ObjectConverter extends AbstractComplexConverter {
private IARecordBuilder builder;
+ /**
+ * {@link IExternalFilterValueEmbedder} decides whether the object should be ignored entirely
+ */
+ private boolean ignore = false;
public ObjectConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
- ParquetConverterContext context) {
+ ParquetConverterContext context) throws IOException {
super(parent, index, parquetType, context);
}
- public ObjectConverter(AbstractComplexConverter parent, IValueReference fieldName, int index, GroupType parquetType,
- ParquetConverterContext context) {
- super(parent, fieldName, index, parquetType, context);
+ public ObjectConverter(AbstractComplexConverter parent, String stringFieldName, int index, GroupType parquetType,
+ ParquetConverterContext context) throws IOException {
+ super(parent, stringFieldName, index, parquetType, context);
}
@Override
public void start() {
tempStorage = context.enterObject();
builder = context.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ if (isRoot()) {
+ valueEmbedder.reset();
+ valueEmbedder.enterObject();
+ } else {
+ ignore = checkValueEmbedder(valueEmbedder);
+ }
}
@Override
public void end() {
- try {
- builder.write(getParentDataOutput(), true);
- } catch (IOException e) {
- throw new IllegalStateException(e);
+ if (!ignore) {
+ writeToParent();
+ context.getValueEmbedder().exitObject();
}
- addThisValueToParent();
+
context.exitObject(tempStorage, null, builder);
tempStorage = null;
builder = null;
+ ignore = false;
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.OBJECT;
}
@Override
public void addValue(IFieldValue value) {
+ if (ignore) {
+ // The value was embedded already
+ return;
+ }
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ IValueReference fieldName = value.getFieldName();
try {
- builder.addField(value.getFieldName(), getValue());
+ if (valueEmbedder.shouldEmbed(value.getStringFieldName(), value.getTypeTag())) {
+ builder.addField(fieldName, valueEmbedder.getEmbeddedValue());
+ } else {
+ builder.addField(fieldName, getValue());
+ }
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
@@ -76,8 +105,9 @@
protected PrimitiveConverter createAtomicConverter(GroupType type, int index) {
try {
PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
- IValueReference fieldName = context.getSerializedFieldName(type.getFieldName(index));
- return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, fieldName, index, context);
+ String childFieldName = type.getFieldName(index);
+ return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, childFieldName, index,
+ context);
} catch (IOException e) {
throw new IllegalStateException(e);
}
@@ -86,8 +116,8 @@
@Override
protected ArrayConverter createArrayConverter(GroupType type, int index) {
try {
- final IValueReference childFieldName = context.getSerializedFieldName(type.getFieldName(index));
- final GroupType arrayType = type.getType(index).asGroupType();
+ GroupType arrayType = type.getType(index).asGroupType();
+ String childFieldName = type.getFieldName(index);
return new ArrayConverter(this, childFieldName, index, arrayType, context);
} catch (IOException e) {
throw new IllegalStateException(e);
@@ -97,11 +127,50 @@
@Override
protected ObjectConverter createObjectConverter(GroupType type, int index) {
try {
- final IValueReference childFieldName = context.getSerializedFieldName(type.getFieldName(index));
- final GroupType objectType = type.getType(index).asGroupType();
+ GroupType objectType = type.getType(index).asGroupType();
+ String childFieldName = type.getFieldName(index);
return new ObjectConverter(this, childFieldName, index, objectType, context);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
+
+ protected boolean isRoot() {
+ return false;
+ }
+
+ private boolean checkValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
+ boolean embed = valueEmbedder.shouldEmbed(getStringFieldName(), ATypeTag.OBJECT);
+ if (embed) {
+ ((ArrayBackedValueStorage) parent.getValue()).set(valueEmbedder.getEmbeddedValue());
+ addThisValueToParent();
+ } else {
+ valueEmbedder.enterObject();
+ }
+ return embed;
+ }
+
+ private void writeToParent() {
+ try {
+ finalizeEmbedding();
+ builder.write(getParentDataOutput(), true);
+ addThisValueToParent();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void finalizeEmbedding() throws IOException {
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ if (valueEmbedder.isMissingEmbeddedValues()) {
+ String[] embeddedFieldNames = valueEmbedder.getEmbeddedFieldNames();
+ for (int i = 0; i < embeddedFieldNames.length; i++) {
+ String embeddedFieldName = embeddedFieldNames[i];
+ if (valueEmbedder.isMissing(embeddedFieldName)) {
+ IValueReference embeddedValue = valueEmbedder.getEmbeddedValue();
+ builder.addField(context.getSerializedFieldName(embeddedFieldName), embeddedValue);
+ }
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
index 09a104b..3936ad7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
@@ -19,17 +19,19 @@
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.DataOutput;
+import java.io.IOException;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
class RepeatedConverter extends AbstractComplexConverter {
public RepeatedConverter(AbstractComplexConverter parent, int index, GroupType parquetType,
- ParquetConverterContext context) {
+ ParquetConverterContext context) throws IOException {
super(parent, index, parquetType, context);
}
@@ -44,25 +46,44 @@
}
@Override
+ public ATypeTag getTypeTag() {
+ return parent.getTypeTag();
+ }
+
+ @Override
public void addValue(IFieldValue value) {
parent.addValue(value);
}
@Override
protected PrimitiveConverter createAtomicConverter(GroupType type, int index) {
- PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
- return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, index, context);
+ try {
+ PrimitiveType primitiveType = type.getType(index).asPrimitiveType();
+ return PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, index, context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
@Override
protected ArrayConverter createArrayConverter(GroupType type, int index) {
- final GroupType arrayType = type.getType(index).asGroupType();
- return new ArrayConverter(this, index, arrayType, context);
+ try {
+ GroupType arrayType = type.getType(index).asGroupType();
+ return new ArrayConverter(this, index, arrayType, context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+
}
@Override
protected ObjectConverter createObjectConverter(GroupType type, int index) {
- return new ObjectConverter(this, index, type.getType(index).asGroupType(), context);
+ try {
+ return new ObjectConverter(this, index, type.getType(index).asGroupType(), context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java
index 24a531a..df8f43f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RootConverter.java
@@ -19,8 +19,10 @@
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
import java.io.DataOutput;
+import java.io.IOException;
import java.util.List;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hyracks.api.exceptions.Warning;
@@ -31,8 +33,9 @@
public class RootConverter extends ObjectConverter {
private final ArrayBackedValueStorage rootBuffer;
- public RootConverter(GroupType parquetType, Configuration configuration, List<Warning> warnings) {
- super(null, -1, parquetType, new ParquetConverterContext(configuration, warnings));
+ public RootConverter(GroupType parquetType, IExternalFilterValueEmbedder valueEmbedder, Configuration configuration,
+ List<Warning> warnings) throws IOException {
+ super(null, -1, parquetType, new ParquetConverterContext(configuration, valueEmbedder, warnings));
this.rootBuffer = new ArrayBackedValueStorage();
}
@@ -42,6 +45,11 @@
return rootBuffer.getDataOutput();
}
+ @Override
+ protected boolean isRoot() {
+ return true;
+ }
+
public IValueReference getRecord() {
return rootBuffer;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
index 15c1d2e..4f371c8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
@@ -18,16 +18,18 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
+
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
public class BinaryConverter extends GenericPrimitiveConverter {
- BinaryConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context) {
- super(parent, fieldName, index, context);
+ BinaryConverter(AbstractComplexConverter parent, String stringFieldName, int index, ParquetConverterContext context)
+ throws IOException {
+ super(ATypeTag.BINARY, parent, stringFieldName, index, context);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
index c8737cd..07a5f79 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
@@ -18,14 +18,16 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
+
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.asterix.om.types.ATypeTag;
class DateConverter extends GenericPrimitiveConverter {
- DateConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context) {
- super(parent, fieldName, index, context);
+ DateConverter(AbstractComplexConverter parent, String stringFieldName, int index, ParquetConverterContext context)
+ throws IOException {
+ super(ATypeTag.DATE, parent, stringFieldName, index, context);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
index e93bcf7..81fb36d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
@@ -18,13 +18,14 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
/**
@@ -36,9 +37,9 @@
private final int precision;
private final int scale;
- DecimalConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context, int precision, int scale) {
- super(parent, fieldName, index, context);
+ DecimalConverter(AbstractComplexConverter parent, String stringFieldName, int index,
+ ParquetConverterContext context, int precision, int scale) throws IOException {
+ super(ATypeTag.DOUBLE, parent, stringFieldName, index, context);
this.precision = precision;
this.scale = scale;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
index e0b0392..20f82f9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
@@ -18,29 +18,45 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
+
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
public class GenericPrimitiveConverter extends PrimitiveConverter implements IFieldValue {
-
+ private final ATypeTag typeTag;
protected final AbstractComplexConverter parent;
+ protected final String stringFieldName;
protected final IValueReference fieldName;
protected final int index;
protected final ParquetConverterContext context;
- GenericPrimitiveConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context) {
+ GenericPrimitiveConverter(ATypeTag typeTag, AbstractComplexConverter parent, String stringFieldName, int index,
+ ParquetConverterContext context) throws IOException {
+ this.typeTag = typeTag;
this.parent = parent;
- this.fieldName = fieldName;
+ this.stringFieldName = stringFieldName;
+ this.fieldName = context.getSerializedFieldName(stringFieldName);
this.index = index;
this.context = context;
}
@Override
+ public ATypeTag getTypeTag() {
+ return typeTag;
+ }
+
+ @Override
+ public String getStringFieldName() {
+ return stringFieldName;
+ }
+
+ @Override
public final IValueReference getFieldName() {
return fieldName;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
index 258a10a..8e4c556 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
@@ -25,6 +25,7 @@
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.external.parser.JSONDataParser;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream;
@@ -37,9 +38,9 @@
private final JSONDataParser parser;
private final ByteArrayAccessibleInputStream in;
- JsonStringConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context) {
- super(parent, fieldName, index, context);
+ JsonStringConverter(AbstractComplexConverter parent, String stringFieldName, int index,
+ ParquetConverterContext context) throws IOException {
+ super(ATypeTag.ANY, parent, stringFieldName, index, context);
parser = new JSONDataParser(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE, new JsonFactory());
in = new ByteArrayAccessibleInputStream(EMPTY, 0, 0);
try {
@@ -50,6 +51,14 @@
}
@Override
+ public ATypeTag getTypeTag() {
+ IValueReference value = parent.getValue();
+ byte[] bytes = value.getByteArray();
+ int startOffset = value.getStartOffset();
+ return ATypeTag.VALUE_TYPE_MAPPING[bytes[startOffset]];
+ }
+
+ @Override
public void addBinary(Binary value) {
byte[] bytes = value.getBytes();
in.setContent(bytes, 0, value.length());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
index 38c441a..0f36d11 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
@@ -18,11 +18,12 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
+
import org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixTypeToParquetTypeVisitor;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
@@ -41,12 +42,12 @@
}
public static PrimitiveConverter createPrimitiveConverter(PrimitiveType type, AbstractComplexConverter parent,
- int index, ParquetConverterContext context) {
+ int index, ParquetConverterContext context) throws IOException {
return createPrimitiveConverter(type, parent, null, index, context);
}
public static PrimitiveConverter createPrimitiveConverter(PrimitiveType type, AbstractComplexConverter parent,
- IValueReference fieldName, int index, ParquetConverterContext context) {
+ String stringFieldName, int index, ParquetConverterContext context) throws IOException {
if (type == MISSING) {
return MissingConverter.INSTANCE;
@@ -56,63 +57,65 @@
switch (mappedType) {
case BOOLEAN:
case STRING:
- return new GenericPrimitiveConverter(parent, fieldName, index, context);
+ return new GenericPrimitiveConverter(mappedType, parent, stringFieldName, index, context);
case BIGINT:
- return getIntConverter(type, parent, fieldName, index, context);
+ return getIntConverter(mappedType, type, parent, stringFieldName, index, context);
case DOUBLE:
- return getDoubleConverter(type, parent, fieldName, index, context);
+ return getDoubleConverter(mappedType, type, parent, stringFieldName, index, context);
case BINARY:
- return new BinaryConverter(parent, fieldName, index, context);
+ return new BinaryConverter(parent, stringFieldName, index, context);
case UUID:
- return new UUIDConverter(parent, fieldName, index, context);
+ return new UUIDConverter(parent, stringFieldName, index, context);
case DATE:
- return new DateConverter(parent, fieldName, index, context);
+ return new DateConverter(parent, stringFieldName, index, context);
case TIME:
- return getTimeConverter(type, parent, fieldName, index, context);
+ return getTimeConverter(type, parent, stringFieldName, index, context);
case DATETIME:
- return getTimeStampConverter(type, parent, fieldName, index, context);
+ return getTimeStampConverter(type, parent, stringFieldName, index, context);
case ANY:
- return new JsonStringConverter(parent, fieldName, index, context);
+ return new JsonStringConverter(parent, stringFieldName, index, context);
default:
return MissingConverter.INSTANCE;
}
}
- private static PrimitiveConverter getIntConverter(PrimitiveType type, AbstractComplexConverter parent,
- IValueReference fieldName, int index, ParquetConverterContext context) {
+ private static PrimitiveConverter getIntConverter(ATypeTag typeTag, PrimitiveType type,
+ AbstractComplexConverter parent, String stringFieldName, int index, ParquetConverterContext context)
+ throws IOException {
IntLogicalTypeAnnotation intType = (IntLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
if (intType != null && !intType.isSigned()) {
- return new UnsignedIntegerConverter(parent, fieldName, index, context);
+ return new UnsignedIntegerConverter(parent, stringFieldName, index, context);
}
- return new GenericPrimitiveConverter(parent, fieldName, index, context);
+ return new GenericPrimitiveConverter(typeTag, parent, stringFieldName, index, context);
}
- private static PrimitiveConverter getDoubleConverter(PrimitiveType type, AbstractComplexConverter parent,
- IValueReference fieldName, int index, ParquetConverterContext context) {
+ private static PrimitiveConverter getDoubleConverter(ATypeTag typeTag, PrimitiveType type,
+ AbstractComplexConverter parent, String stringFieldName, int index, ParquetConverterContext context)
+ throws IOException {
LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
if (logicalType instanceof DecimalLogicalTypeAnnotation) {
DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) logicalType;
- return new DecimalConverter(parent, fieldName, index, context, decimalLogicalType.getPrecision(),
+ return new DecimalConverter(parent, stringFieldName, index, context, decimalLogicalType.getPrecision(),
decimalLogicalType.getScale());
}
- return new GenericPrimitiveConverter(parent, fieldName, index, context);
+ return new GenericPrimitiveConverter(typeTag, parent, stringFieldName, index, context);
}
private static PrimitiveConverter getTimeConverter(PrimitiveType type, AbstractComplexConverter parent,
- IValueReference fieldName, int index, ParquetConverterContext context) {
+ String stringFieldName, int index, ParquetConverterContext context) throws IOException {
TimeLogicalTypeAnnotation timeLogicalType = (TimeLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
- return new TimeConverter(parent, fieldName, index, context, timeLogicalType.getUnit());
+ return new TimeConverter(parent, stringFieldName, index, context, timeLogicalType.getUnit());
}
private static PrimitiveConverter getTimeStampConverter(PrimitiveType type, AbstractComplexConverter parent,
- IValueReference fieldName, int index, ParquetConverterContext context) {
+ String stringFieldName, int index, ParquetConverterContext context) throws IOException {
TimestampLogicalTypeAnnotation tsType = (TimestampLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
if (tsType != null) {
int offset = tsType.isAdjustedToUTC() ? context.getTimeZoneOffset() : 0;
- return new TimestampConverter(parent, fieldName, index, context, tsType.getUnit(), offset);
+ return new TimestampConverter(parent, stringFieldName, index, context, tsType.getUnit(), offset);
}
//INT96: the converter will convert the value to millis
- return new TimestampConverter(parent, fieldName, index, context, TimeUnit.MILLIS, 0);
+ return new TimestampConverter(parent, stringFieldName, index, context, TimeUnit.MILLIS, 0);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
index fa9f36c..c78d6e7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
@@ -18,19 +18,20 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.schema.LogicalTypeAnnotation;
public class TimeConverter extends GenericPrimitiveConverter {
private final LogicalTypeAnnotation.TimeUnit timeUnit;
- TimeConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context, LogicalTypeAnnotation.TimeUnit timeUnit) {
- super(parent, fieldName, index, context);
+ TimeConverter(AbstractComplexConverter parent, String stringFieldName, int index, ParquetConverterContext context,
+ LogicalTypeAnnotation.TimeUnit timeUnit) throws IOException {
+ super(ATypeTag.TIME, parent, stringFieldName, index, context);
this.timeUnit = timeUnit;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
index 136febe..c36de37 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
@@ -18,12 +18,13 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -35,9 +36,10 @@
private final LogicalTypeAnnotation.TimeUnit timeUnit;
private final int timeZoneOffset;
- TimestampConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context, LogicalTypeAnnotation.TimeUnit timeUnit, int timeZoneOffset) {
- super(parent, fieldName, index, context);
+ TimestampConverter(AbstractComplexConverter parent, String stringFieldName, int index,
+ ParquetConverterContext context, LogicalTypeAnnotation.TimeUnit timeUnit, int timeZoneOffset)
+ throws IOException {
+ super(ATypeTag.DATETIME, parent, stringFieldName, index, context);
this.timeUnit = timeUnit;
this.timeZoneOffset = timeZoneOffset;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
index ec07c60..23667ca 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
@@ -18,15 +18,17 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
+
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
-import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
public class UUIDConverter extends GenericPrimitiveConverter {
- UUIDConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context) {
- super(parent, fieldName, index, context);
+ UUIDConverter(AbstractComplexConverter parent, String stringFieldName, int index, ParquetConverterContext context)
+ throws IOException {
+ super(ATypeTag.UUID, parent, stringFieldName, index, context);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
index 763b8c8..5bb4ad7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
@@ -18,19 +18,20 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+import java.io.IOException;
+
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.data.std.api.IValueReference;
public class UnsignedIntegerConverter extends GenericPrimitiveConverter {
private boolean overflowed;
- UnsignedIntegerConverter(AbstractComplexConverter parent, IValueReference fieldName, int index,
- ParquetConverterContext context) {
- super(parent, fieldName, index, context);
+ UnsignedIntegerConverter(AbstractComplexConverter parent, String stringFieldName, int index,
+ ParquetConverterContext context) throws IOException {
+ super(ATypeTag.BIGINT, parent, stringFieldName, index, context);
overflowed = false;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 9a7494f..ebf4204 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -139,7 +139,7 @@
addExternalProjectionInfo(projectionFiltrationInfo, edd.getProperties());
properties = addSubPath(externalDataSource.getProperties(), properties);
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize));
- IExternalFilterEvaluatorFactory filterEvaluatorFactory = IndexUtil
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory = metadataProvider
.createExternalFilterEvaluatorFactory(context, typeEnv, projectionFiltrationInfo, properties);
ITypedAdapterFactory adapterFactory =
metadataProvider.getConfiguredAdapterFactory(externalDataset, edd.getAdapter(), properties,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index e15063b..19b69f4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1830,6 +1830,12 @@
}
}
+ public IExternalFilterEvaluatorFactory createExternalFilterEvaluatorFactory(JobGenContext context,
+ IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo projectionFiltrationInfo,
+ Map<String, String> properties) throws AlgebricksException {
+ return IndexUtil.createExternalFilterEvaluatorFactory(context, typeEnv, projectionFiltrationInfo, properties);
+ }
+
public void validateDatabaseObjectName(DataverseName dataverseName, String objectName, SourceLocation sourceLoc)
throws AlgebricksException {
if (dataverseName != null) {