[ASTERIXDB-3578][EXT] Error with query on delta table with IN predicate
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-65533
Change-Id: I7177a10da9145a8de862991b3e57213138dae9b9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19472
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp
new file mode 100644
index 0000000..8e3b6b1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" AND ds.name IN ["Order 1", "Order 3", "Order 4"] order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp
new file mode 100644
index 0000000..71167a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.03.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" and ds.hour in [10, 16, 18] order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp
new file mode 100644
index 0000000..3bddae5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.04.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds WHERE ds.date = "01-01-2025" or ds.date="01-02-2025" order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp
new file mode 100644
index 0000000..8620657
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.05.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds
+ WHERE (ds.date > "01-02-2025" and ds.hour = 10) or (ds.date < "01-02-2025" and ds.hour = 15)
+ order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp
new file mode 100644
index 0000000..7b88e1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.06.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+SELECT ds.id as id1, ds2.id as id2 FROM DeltalakeDataset as ds, DeltalakeDataset as ds2
+WHERE ds.hour = ds2.hour and ds.date = "01-01-2025" and ds2.date is not null
+order by ds.id, ds2.id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp
new file mode 100644
index 0000000..6478788
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.07.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds
+ WHERE (ds.date > "01-02-2025" and (ds.hour = 10 or ds.hour=16 or ds.hour=12) and ds.id>1) or (ds.date < "01-02-2025" and ds.hour = 15)
+ order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp
new file mode 100644
index 0000000..eaf58cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.08.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds
+ WHERE (ds.date >= "01-02-2025" and (ds.hour = 10 or ds.hour=16 or ds.hour=12) and ds.id>1) or
+ (ds.date < "01-02-2025" and ds.hour = 15) or ds.date =10
+ order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp
new file mode 100644
index 0000000..1a9dd7c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.09.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds
+ WHERE ds.date = 10 or ds.date = "01-01-2025"
+ order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm
new file mode 100644
index 0000000..86d41fe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.1.adm
@@ -0,0 +1,5 @@
+{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
+{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
index 86d41fe..cf5ecd4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
@@ -1,5 +1,2 @@
{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
-{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
-{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
-{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
-{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm
new file mode 100644
index 0000000..00ba142
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.3.adm
@@ -0,0 +1,3 @@
+{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
+{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm
new file mode 100644
index 0000000..b02a62d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.4.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
+{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
+{ "id": 5, "name": "Order 21", "date": "01-02-2025", "hour": 12 }
+{ "id": 6, "name": "Order 22", "date": "01-02-2025", "hour": 12 }
+{ "id": 7, "name": "Order 30", "date": "01-02-2025", "hour": 16 }
+{ "id": 8, "name": "Order 31", "date": "01-02-2025", "hour": 16 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm
new file mode 100644
index 0000000..6fd91f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.5.adm
@@ -0,0 +1,2 @@
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm
new file mode 100644
index 0000000..5bfd257
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.6.adm
@@ -0,0 +1,13 @@
+{ "id1": 0, "id2": 0 }
+{ "id1": 0, "id2": 1 }
+{ "id1": 0, "id2": 2 }
+{ "id1": 1, "id2": 0 }
+{ "id1": 1, "id2": 1 }
+{ "id1": 1, "id2": 2 }
+{ "id1": 2, "id2": 0 }
+{ "id1": 2, "id2": 1 }
+{ "id1": 2, "id2": 2 }
+{ "id1": 3, "id2": 3 }
+{ "id1": 3, "id2": 4 }
+{ "id1": 4, "id2": 3 }
+{ "id1": 4, "id2": 4 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm
new file mode 100644
index 0000000..6fd91f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.7.adm
@@ -0,0 +1,2 @@
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm
new file mode 100644
index 0000000..e943ba7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.8.adm
@@ -0,0 +1,6 @@
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
+{ "id": 5, "name": "Order 21", "date": "01-02-2025", "hour": 12 }
+{ "id": 6, "name": "Order 22", "date": "01-02-2025", "hour": 12 }
+{ "id": 7, "name": "Order 30", "date": "01-02-2025", "hour": 16 }
+{ "id": 8, "name": "Order 31", "date": "01-02-2025", "hour": 16 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm
new file mode 100644
index 0000000..86d41fe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.9.adm
@@ -0,0 +1,5 @@
+{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
+{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 3c998a5..4e902b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -62,6 +62,7 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
@@ -107,7 +108,7 @@
Snapshot snapshot;
try {
snapshot = table.getLatestSnapshot(engine);
- } catch (KernelException e) {
+ } catch (KernelException | KernelEngineException e) {
LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
}
@@ -136,30 +137,36 @@
scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
}
scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
- CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
+ List<Row> scanFiles;
+ try {
+ scanFiles = getScanFiles(scan, engine);
+ } catch (UnsupportedOperationException | IllegalStateException e) {
+ // Delta kernel API failed to apply expression due to type mismatch.
+ // We need to fall back to skip applying the filter and return all files.
+ LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage());
+ scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+ scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+ scanFiles = getScanFiles(scan, engine);
+ }
+ LOGGER.info("Number of delta table parquet data files to scan: {}", scanFiles.size());
+ locationConstraints = getPartitions(appCtx);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+ distributeFiles(scanFiles, getPartitionConstraint().getLocations().length);
+ issueWarnings(warnings, warningCollector);
+ }
+ private List<Row> getScanFiles(Scan scan, Engine engine) {
List<Row> scanFiles = new ArrayList<>();
+ CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
while (iter.hasNext()) {
- FilteredColumnarBatch batch = null;
- try {
- batch = iter.next();
- } catch (UnsupportedOperationException e) {
- // Failed to apply expression due to type mismatch. We can skip the files where partitioned column
- // type is different from the type of value provided in the predicate
- LOGGER.info("Unsupported operation {}", e.getMessage());
- continue;
- }
+ FilteredColumnarBatch batch = iter.next();
CloseableIterator<Row> rowIter = batch.getRows();
while (rowIter.hasNext()) {
Row row = rowIter.next();
scanFiles.add(row);
}
}
- LOGGER.info("Number of files to scan: {}", scanFiles.size());
- locationConstraints = getPartitions(appCtx);
- configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
- distributeFiles(scanFiles, getPartitionConstraint().getLocations().length);
- issueWarnings(warnings, warningCollector);
+ return scanFiles;
}
private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 82b5dad..fac1199 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -120,6 +120,7 @@
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.KernelException;
public class ExternalDataUtils {
@@ -540,7 +541,7 @@
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
try {
table.getLatestSnapshot(engine);
- } catch (KernelException e) {
+ } catch (KernelException | KernelEngineException e) {
LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
index 73ed81e..112dba2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
@@ -35,6 +35,7 @@
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -48,8 +49,7 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.logging.log4j.LogManager;
-
-import com.microsoft.azure.storage.core.Logger;
+import org.apache.logging.log4j.Logger;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Expression;
@@ -58,7 +58,7 @@
public class DeltaTableFilterBuilder extends AbstractFilterBuilder {
- private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+ private static final Logger LOGGER = LogManager.getLogger();
public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
JobGenContext context, IVariableTypeEnvironment typeEnv) {
@@ -72,7 +72,7 @@
try {
deltaTablePredicate = createExpression(filterExpression);
} catch (Exception e) {
- LOGGER.error("Error creating DeltaTable filter expression ", e);
+ LOGGER.error("Error creating DeltaTable filter expression, skipping filter pushdown", e);
}
}
if (deltaTablePredicate != null && !(deltaTablePredicate instanceof Predicate)) {
@@ -138,12 +138,16 @@
private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException {
AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
IFunctionDescriptor fd = resolveFunction(funcExpr);
- List<Expression> args = handleArgs(funcExpr);
FunctionIdentifier fid = fd.getIdentifier();
+ if (funcExpr.getArguments().size() != 2
+ && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) {
+ throw new RuntimeException("Predicate should only have 2 arguments: " + funcExpr);
+ }
+ List<Expression> args = handleArgs(funcExpr);
if (fid.equals(AlgebricksBuiltinFunctions.AND)) {
- return new Predicate("AND", args);
+ return createAndOrPredicate("AND", args, 0);
} else if (fid.equals(AlgebricksBuiltinFunctions.OR)) {
- return new Predicate("OR", args);
+ return createAndOrPredicate("OR", args, 0);
} else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
return new Predicate("=", args);
} else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
@@ -173,8 +177,40 @@
protected Column createColumnExpression(ILogicalExpression expression) {
ARecordType path = filterPaths.get(expression);
if (path.getFieldNames().length != 1) {
- throw new RuntimeException("Unsupported expression: " + expression);
+ throw new RuntimeException("Unsupported column expression: " + expression);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+ // The field could be a nested field
+ List<String> fieldList = new ArrayList<>();
+ fieldList = createPathExpression(path, fieldList);
+ return new Column(fieldList.toArray(new String[0]));
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+ return new Column(path.getFieldNames()[0]);
+ } else {
+ throw new RuntimeException("Unsupported column expression: " + expression);
}
- return new Column(path.getFieldNames()[0]);
+ }
+
+ private List<String> createPathExpression(ARecordType path, List<String> fieldList) {
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Error creating column expression");
+ } else {
+ fieldList.add(path.getFieldNames()[0]);
+ }
+ if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+ return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+ return fieldList;
+ } else {
+ throw new RuntimeException("Error creating column expression");
+ }
+ }
+
+ // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3))
+ private Predicate createAndOrPredicate(String function, List<Expression> args, int index) {
+ if (index == args.size() - 2) {
+ return new Predicate(function, args.get(index), args.get(index + 1));
+ } else {
+ return new Predicate(function, args.get(index), createAndOrPredicate(function, args, index + 1));
+ }
}
}