[NO ISSUE][COMP] Copy LIMIT through UNION ALL
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Enhance CopyLimitDownRule to support copying
LIMIT operators through UNION ALL
Change-Id: I9df5e853538c14a05108ac6f0f4ef4916a5f8e1a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8064
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
new file mode 100644
index 0000000..4fe8ac9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed to data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+with T1 as (
+ select two from onek1 where two between 1 and 10
+ union all
+ select two from onek2 where two between 1 and 100
+),
+T2 as (
+ select two from onek1 where two between 1 and 1000
+ union all
+ select two from onek2 where two between 1 and 10000
+),
+T3 as (
+ select two from T1
+ union all
+ select two from T2
+)
+select value t from T3 t
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
new file mode 100644
index 0000000..0109646
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed to data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+explain
+with T1 as (
+ select two from onek1 where two between 1 and 10
+ union all
+ select two from onek2 where two between 1 and 100
+),
+T2 as (
+ select two from onek1 where two between 1 and 1000
+ union all
+ select two from onek2 where two between 1 and 10000
+),
+T3 as (
+ select two from T1
+ union all
+ select two from T2
+)
+select value t from T3 t
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
new file mode 100644
index 0000000..373861b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed into data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+with T as (
+ select two from onek1
+ union all
+ select two from onek2
+)
+select value t from T t
+where two > 0
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
new file mode 100644
index 0000000..41ef7d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed into data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+explain
+with T as (
+ select two from onek1
+ union all
+ select two from onek2
+)
+select value t from T t
+where two > 0
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
new file mode 100644
index 0000000..a46ab74
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
@@ -0,0 +1,4 @@
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
new file mode 100644
index 0000000..1b39645
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
@@ -0,0 +1,106 @@
+distribute result [$$t]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |UNPARTITIONED|
+ exchange
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ union ($$151, $$310, $$t)
+ -- UNION_ALL |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ union ($$213, $$227, $$151)
+ -- UNION_ALL |PARTITIONED|
+ exchange
+ -- RANDOM_PARTITION_EXCHANGE |PARTITIONED|
+ project ([$$213])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$213] <- [{"two": $$183}]
+ -- ASSIGN |PARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$183])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$183] <- [$$onek1.getField(2)]
+ -- ASSIGN |PARTITIONED|
+ project ([$$onek1])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$187, $$onek1] <- test.onek1 condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 10))) limit 4
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange
+ -- RANDOM_PARTITION_EXCHANGE |PARTITIONED|
+ project ([$$227])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$227] <- [{"two": $$184}]
+ -- ASSIGN |PARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$184])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$184] <- [$$onek2.getField(2)]
+ -- ASSIGN |PARTITIONED|
+ project ([$$onek2])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$188, $$onek2] <- test.onek2 condition (and(ge($$onek2.getField(2), 1), le($$onek2.getField(2), 100))) limit 4
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ union ($$345, $$356, $$310)
+ -- UNION_ALL |PARTITIONED|
+ exchange
+ -- RANDOM_PARTITION_EXCHANGE |PARTITIONED|
+ project ([$$345])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$345] <- [{"two": $$185}]
+ -- ASSIGN |PARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$185])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$185] <- [$$onek1.getField(2)]
+ -- ASSIGN |PARTITIONED|
+ project ([$$onek1])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$189, $$onek1] <- test.onek1 condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 1000))) limit 4
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange
+ -- RANDOM_PARTITION_EXCHANGE |PARTITIONED|
+ project ([$$356])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$356] <- [{"two": $$186}]
+ -- ASSIGN |PARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$186])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$186] <- [$$onek2.getField(2)]
+ -- ASSIGN |PARTITIONED|
+ project ([$$onek2])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$190, $$onek2] <- test.onek2 condition (and(ge($$onek2.getField(2), 1), le($$onek2.getField(2), 10000))) limit 4
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
new file mode 100644
index 0000000..a46ab74
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
@@ -0,0 +1,4 @@
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
new file mode 100644
index 0000000..4a46e2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
@@ -0,0 +1,54 @@
+distribute result [$$t]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |UNPARTITIONED|
+ exchange
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ union ($$48, $$54, $$t)
+ -- UNION_ALL |PARTITIONED|
+ exchange
+ -- RANDOM_PARTITION_EXCHANGE |PARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$48])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$48] <- [{"two": $$103}]
+ -- ASSIGN |PARTITIONED|
+ project ([$$103])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$103] <- [$$onek1.getField(2)]
+ -- ASSIGN |PARTITIONED|
+ project ([$$onek1])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$61, $$onek1] <- test.onek1 condition (gt($$onek1.getField(2), 0)) limit 4
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange
+ -- RANDOM_PARTITION_EXCHANGE |PARTITIONED|
+ limit 4
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$54])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$54] <- [{"two": $$105}]
+ -- ASSIGN |PARTITIONED|
+ project ([$$105])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$105] <- [$$onek2.getField(2)]
+ -- ASSIGN |PARTITIONED|
+ project ([$$onek2])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$62, $$onek2] <- test.onek2 condition (gt($$onek2.getField(2), 0)) limit 4
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
index f0eca82..382b80d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.rewriter.rules;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
@@ -62,62 +63,101 @@
List<LogicalVariable> limitUsedVars = new ArrayList<>();
VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
- Mutable<ILogicalOperator> safeOpRef = null;
- Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+ List<ILogicalOperator> safeOps = new ArrayList<>();
+ List<LogicalVariable> tmpCandidateProducedVars = new ArrayList<>();
+ ILogicalOperator limitInputOp = limitOp.getInputs().get(0).getValue();
- List<LogicalVariable> candidateProducedVars = new ArrayList<>();
- while (true) {
- ILogicalOperator candidateOp = candidateOpRef.getValue();
- LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
- if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
- || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.UNNEST_MAP) {
+ findSafeOpsInSubtree(limitInputOp, limitUsedVars, safeOps, tmpCandidateProducedVars);
+ if (safeOps.isEmpty()) {
+ return false;
+ }
+
+ SourceLocation sourceLoc = limitOp.getSourceLocation();
+
+ for (ILogicalOperator safeOp : safeOps) {
+ for (Mutable<ILogicalOperator> unsafeOpRef : safeOp.getInputs()) {
+ ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+ ILogicalExpression maxObjectsExpr = limitOp.getMaxObjects().getValue();
+ ILogicalExpression newMaxObjectsExpr;
+ if (limitOp.getOffset().getValue() == null) {
+ newMaxObjectsExpr = maxObjectsExpr.cloneExpression();
+ } else {
+ // Need to add an offset to the given limit value
+ // since the original topmost limit will use the offset value.
+ // We can't apply the offset multiple times.
+ IFunctionInfo finfoAdd =
+ context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
+ List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>(2);
+ addArgs.add(new MutableObject<>(maxObjectsExpr.cloneExpression()));
+ addArgs.add(new MutableObject<>(limitOp.getOffset().getValue().cloneExpression()));
+ ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+ maxPlusOffset.setSourceLocation(sourceLoc);
+ newMaxObjectsExpr = maxPlusOffset;
+ }
+ LimitOperator limitCloneOp = new LimitOperator(newMaxObjectsExpr, false);
+ limitCloneOp.setSourceLocation(sourceLoc);
+ limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+ limitCloneOp.getInputs().add(new MutableObject<>(unsafeOp));
+ limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+ context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+ limitCloneOp.recomputeSchema();
+ unsafeOpRef.setValue(limitCloneOp);
+ }
+ }
+
+ context.addToDontApplySet(this, limitOp);
+
+ return true;
+ }
+
+ private boolean findSafeOpsInSubtree(ILogicalOperator candidateOp, List<LogicalVariable> limitUsedVars,
+ Collection<? super ILogicalOperator> outSafeOps, List<LogicalVariable> tmpCandidateProducedVars)
+ throws AlgebricksException {
+ ILogicalOperator safeOp = null;
+
+ while (isSafeOpCandidate(candidateOp)) {
+ tmpCandidateProducedVars.clear();
+ VariableUtilities.getProducedVariables(candidateOp, tmpCandidateProducedVars);
+ if (!OperatorPropertiesUtil.disjoint(limitUsedVars, tmpCandidateProducedVars)) {
break;
}
- candidateProducedVars.clear();
- VariableUtilities.getProducedVariables(candidateOp, candidateProducedVars);
- if (!OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
- break;
+ List<Mutable<ILogicalOperator>> candidateOpInputs = candidateOp.getInputs();
+ if (candidateOpInputs.size() > 1) {
+ boolean foundSafeOpInBranch = false;
+ for (Mutable<ILogicalOperator> inputOpRef : candidateOpInputs) {
+ foundSafeOpInBranch |= findSafeOpsInSubtree(inputOpRef.getValue(), limitUsedVars, outSafeOps,
+ tmpCandidateProducedVars);
+ }
+ if (!foundSafeOpInBranch) {
+ outSafeOps.add(candidateOp);
+ }
+ return true;
}
- safeOpRef = candidateOpRef;
- candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+ safeOp = candidateOp;
+ candidateOp = candidateOpInputs.get(0).getValue();
}
- if (safeOpRef != null) {
- ILogicalOperator safeOp = safeOpRef.getValue();
- Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
- ILogicalOperator unsafeOp = unsafeOpRef.getValue();
- SourceLocation sourceLoc = limitOp.getSourceLocation();
- LimitOperator limitCloneOp = null;
- if (limitOp.getOffset().getValue() == null) {
- limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
- limitCloneOp.setSourceLocation(sourceLoc);
- } else {
- // Need to add an offset to the given limit value
- // since the original topmost limit will use the offset value.
- // We can't apply the offset multiple times.
- IFunctionInfo finfoAdd =
- context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
- List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
- addArgs.add(
- new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
- addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
- ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
- maxPlusOffset.setSourceLocation(sourceLoc);
- limitCloneOp = new LimitOperator(maxPlusOffset, false);
- limitCloneOp.setSourceLocation(sourceLoc);
- }
- limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
- limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
- limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
- OperatorPropertiesUtil.computeSchemaRecIfNull((AbstractLogicalOperator) unsafeOp);
- limitCloneOp.recomputeSchema();
- unsafeOpRef.setValue(limitCloneOp);
- context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
- context.addToDontApplySet(this, limitOp);
+ if (safeOp != null) {
+ outSafeOps.add(safeOp);
+ return true;
+ } else {
+ return false;
}
+ }
- return safeOpRef != null;
+ private static boolean isSafeOpCandidate(ILogicalOperator op) {
+ switch (op.getOperatorTag()) {
+ case UNIONALL:
+ return true;
+ // exclude following 'map' operators because they change cardinality
+ case SELECT:
+ case UNNEST:
+ case UNNEST_MAP:
+ return false;
+ default:
+ return op.getInputs().size() == 1 && op.isMap();
+ }
}
}