[ASTERIXDB-3582][COMP] Fix Concurrent Modification in Filter Pushdown
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
The issue occurs due to the presence of nested subplans. While iterating
over the subplan map to identify filters that can be pushed down, we
encounter new subplans that consume the output of the current subplan.
To account for these new subplans, we attempt to add them to the map
during iteration.
However, since the current implementation uses a regular HashMap, which
does not allow modifications while iterating, this results in a
ConcurrentModificationException.
Ext-ref: MB-65792
Change-Id: I636ed79ffd1de8ce8fd0729cf22867835527ff9a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19528
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ritik <ritik.raj@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index cad9315..7d4f7a5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -29,8 +29,10 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import org.apache.asterix.om.base.IAObject;
@@ -55,6 +57,7 @@
private final Map<ILogicalOperator, List<UseDescriptor>> subplanSelects;
private final List<UseDescriptor> scanCandidateFilters;
private final Set<LogicalVariable> subplanProducedVariables;
+ private final Queue<ILogicalOperator> subplanOpsQueue;
public AbstractFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
super(pushdownContext, context);
@@ -62,6 +65,7 @@
subplanSelects = new HashMap<>();
scanCandidateFilters = new ArrayList<>();
subplanProducedVariables = new HashSet<>();
+ subplanOpsQueue = new LinkedList<>();
}
@Override
@@ -191,8 +195,10 @@
* @param scanDescriptor data-scan descriptor
*/
private void putPotentialSelects(ScanDefineDescriptor scanDescriptor) throws AlgebricksException {
- for (Map.Entry<ILogicalOperator, List<UseDescriptor>> selects : subplanSelects.entrySet()) {
- ILogicalOperator subplan = selects.getKey();
+ subplanOpsQueue.clear();
+ subplanOpsQueue.addAll(subplanSelects.keySet());
+ while (!subplanOpsQueue.isEmpty()) {
+ ILogicalOperator subplan = subplanOpsQueue.poll();
subplanProducedVariables.clear();
VariableUtilities.getProducedVariables(subplan, subplanProducedVariables);
for (LogicalVariable producedVar : subplanProducedVariables) {
@@ -236,8 +242,13 @@
boolean inSubplan = useDescriptor.inSubplan();
if (inSubplan && useOperator.getOperatorTag() == LogicalOperatorTag.SELECT) {
ILogicalOperator subplanOp = useDescriptor.getSubplanOperator();
- List<UseDescriptor> selects = subplanSelects.computeIfAbsent(subplanOp, k -> new ArrayList<>());
- selects.add(useDescriptor);
+ List<UseDescriptor> selects = subplanSelects.get(subplanOp);
+ if (selects == null) {
+ subplanOpsQueue.add(subplanOp);
+ subplanSelects.computeIfAbsent(subplanOp, k -> new ArrayList<>()).add(useDescriptor);
+ } else {
+ selects.add(useDescriptor);
+ }
}
// Finally, push down if not in subplan
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp
new file mode 100644
index 0000000..4a22988
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE DATASET reviews
+PRIMARY KEY (itemno: int, custid: string) WITH {
+ "storage-format": { "format": "column" }
+};
+
+CREATE FUNCTION sent(txt) {
+LET pos = ["bomb","needs"], neg=["shrinks","shrunk","smaller"], exp = split(txt, " ")
+SELECT CASE
+ WHEN (
+ (SOME w IN pos SATISFIES (w IN exp))
+ AND
+ (EVERY w IN neg SATISFIES (w NOT IN exp))
+ )
+ THEN "positive"
+ WHEN (
+ (SOME w IN neg SATISFIES (w IN exp))
+ AND
+ (EVERY w IN pos SATISFIES (w NOT IN exp))
+ )
+ THEN "negative"
+ ELSE "neutral"
+ END
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp
new file mode 100644
index 0000000..412c9a7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+USE test;
+
+EXPLAIN SELECT sent(r.text) FROM reviews r;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan
new file mode 100644
index 0000000..6228382
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan
@@ -0,0 +1,117 @@
+distribute result [$$163] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ assign [$$163] <- [{"$1": $$162}] project: [$$163] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ project ([$$162]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ subplan {
+ aggregate [$$162] <- [listify($$161)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ assign [$$161] <- [{"$2": switch-case(true, and($$140, $$146), "positive", and($$153, $$159), "negative", "neutral")}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |LOCAL|
+ subplan {
+ aggregate [$$159] <- [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select (not(if-missing-or-null($$158, false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ subplan {
+ aggregate [$$158] <- [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select (not(if-missing-or-null(neq($$w, $#6), false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ unnest $#6 <- scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ unnest $$w <- scan-collection(array: [ "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ subplan {
+ aggregate [$$153] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select ($$152) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ subplan {
+ aggregate [$$152] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select (eq($$w, $#5)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ unnest $#5 <- scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ unnest $$w <- scan-collection(array: [ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ subplan {
+ aggregate [$$146] <- [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select (not(if-missing-or-null($$145, false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ subplan {
+ aggregate [$$145] <- [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select (not(if-missing-or-null(neq($$w, $#4), false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ unnest $#4 <- scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ unnest $$w <- scan-collection(array: [ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ subplan {
+ aggregate [$$140] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select ($$139) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ subplan {
+ aggregate [$$139] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- AGGREGATE |LOCAL|
+ select (eq($$w, $#3)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_SELECT |LOCAL|
+ unnest $#3 <- scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ unnest $$w <- scan-collection(array: [ "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- UNNEST |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |LOCAL|
+ nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- SUBPLAN |PARTITIONED|
+ assign [$$166] <- [$$r.getField("text")] project: [$$166] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ project ([$$r]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$164, $$165, $$r] <- test.reviews project ({text:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index c112b42..93c28bb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16536,6 +16536,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="column">
+ <compilation-unit name="filter/ASTERIXDB-3582">
+ <output-dir compare="Text">filter/ASTERIXDB-3582</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="column">
<compilation-unit name="filter/ASTERIXDB-3582-2">
<output-dir compare="Text">filter/ASTERIXDB-3582-2</output-dir>
</compilation-unit>