[NO ISSUE][COMP] Copy LIMIT through UNION ALL

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Enhance CopyLimitDownRule to support copying
  LIMIT operators through UNION ALL

Change-Id: I9df5e853538c14a05108ac6f0f4ef4916a5f8e1a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8064
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
new file mode 100644
index 0000000..4fe8ac9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed to data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+with T1 as (
+  select two from onek1 where two between 1 and 10
+  union all
+  select two from onek2 where two between 1 and 100
+),
+T2 as (
+  select two from onek1 where two between 1 and 1000
+  union all
+  select two from onek2 where two between 1 and 10000
+),
+T3 as (
+  select two from T1
+  union all
+  select two from T2
+)
+select value t from T3 t
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
new file mode 100644
index 0000000..0109646
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed to data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+explain
+with T1 as (
+  select two from onek1 where two between 1 and 10
+  union all
+  select two from onek2 where two between 1 and 100
+),
+T2 as (
+  select two from onek1 where two between 1 and 1000
+  union all
+  select two from onek2 where two between 1 and 10000
+),
+T3 as (
+  select two from T1
+  union all
+  select two from T2
+)
+select value t from T3 t
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
new file mode 100644
index 0000000..373861b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed into data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+with T as (
+  select two from onek1
+  union all
+  select two from onek2
+)
+select value t from T t
+where two > 0
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
new file mode 100644
index 0000000..41ef7d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed into data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+explain
+with T as (
+  select two from onek1
+  union all
+  select two from onek2
+)
+select value t from T t
+where two > 0
+limit 4;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
new file mode 100644
index 0000000..a46ab74
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
@@ -0,0 +1,4 @@
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
new file mode 100644
index 0000000..1b39645
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
@@ -0,0 +1,106 @@
+distribute result [$$t]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 4
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        union ($$151, $$310, $$t)
+        -- UNION_ALL  |PARTITIONED|
+          exchange
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            union ($$213, $$227, $$151)
+            -- UNION_ALL  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$213])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$213] <- [{"two": $$183}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$183])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$183] <- [$$onek1.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek1])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$187, $$onek1] <- test.onek1 condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 10))) limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$227])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$227] <- [{"two": $$184}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$184])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$184] <- [$$onek2.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek2])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$188, $$onek2] <- test.onek2 condition (and(ge($$onek2.getField(2), 1), le($$onek2.getField(2), 100))) limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          exchange
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            union ($$345, $$356, $$310)
+            -- UNION_ALL  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$345])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$345] <- [{"two": $$185}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$185])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$185] <- [$$onek1.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek1])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$189, $$onek1] <- test.onek1 condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 1000))) limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$356])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$356] <- [{"two": $$186}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$186])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$186] <- [$$onek2.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek2])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$190, $$onek2] <- test.onek2 condition (and(ge($$onek2.getField(2), 1), le($$onek2.getField(2), 10000))) limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
new file mode 100644
index 0000000..a46ab74
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
@@ -0,0 +1,4 @@
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
new file mode 100644
index 0000000..4a46e2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
@@ -0,0 +1,54 @@
+distribute result [$$t]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 4
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        union ($$48, $$54, $$t)
+        -- UNION_ALL  |PARTITIONED|
+          exchange
+          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+            limit 4
+            -- STREAM_LIMIT  |PARTITIONED|
+              project ([$$48])
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$48] <- [{"two": $$103}]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$103])
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$103] <- [$$onek1.getField(2)]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$onek1])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$61, $$onek1] <- test.onek1 condition (gt($$onek1.getField(2), 0)) limit 4
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          exchange
+          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+            limit 4
+            -- STREAM_LIMIT  |PARTITIONED|
+              project ([$$54])
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$54] <- [{"two": $$105}]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$105])
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$105] <- [$$onek2.getField(2)]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$onek2])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$62, $$onek2] <- test.onek2 condition (gt($$onek2.getField(2), 0)) limit 4
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
index f0eca82..382b80d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.rewriter.rules;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -62,62 +63,101 @@
         List<LogicalVariable> limitUsedVars = new ArrayList<>();
         VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
 
-        Mutable<ILogicalOperator> safeOpRef = null;
-        Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+        List<ILogicalOperator> safeOps = new ArrayList<>();
+        List<LogicalVariable> tmpCandidateProducedVars = new ArrayList<>();
+        ILogicalOperator limitInputOp = limitOp.getInputs().get(0).getValue();
 
-        List<LogicalVariable> candidateProducedVars = new ArrayList<>();
-        while (true) {
-            ILogicalOperator candidateOp = candidateOpRef.getValue();
-            LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
-            if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
-                    || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.UNNEST_MAP) {
+        findSafeOpsInSubtree(limitInputOp, limitUsedVars, safeOps, tmpCandidateProducedVars);
+        if (safeOps.isEmpty()) {
+            return false;
+        }
+
+        SourceLocation sourceLoc = limitOp.getSourceLocation();
+
+        for (ILogicalOperator safeOp : safeOps) {
+            for (Mutable<ILogicalOperator> unsafeOpRef : safeOp.getInputs()) {
+                ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+                ILogicalExpression maxObjectsExpr = limitOp.getMaxObjects().getValue();
+                ILogicalExpression newMaxObjectsExpr;
+                if (limitOp.getOffset().getValue() == null) {
+                    newMaxObjectsExpr = maxObjectsExpr.cloneExpression();
+                } else {
+                    // Need to add an offset to the given limit value
+                    // since the original topmost limit will use the offset value.
+                    // We can't apply the offset multiple times.
+                    IFunctionInfo finfoAdd =
+                            context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
+                    List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>(2);
+                    addArgs.add(new MutableObject<>(maxObjectsExpr.cloneExpression()));
+                    addArgs.add(new MutableObject<>(limitOp.getOffset().getValue().cloneExpression()));
+                    ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+                    maxPlusOffset.setSourceLocation(sourceLoc);
+                    newMaxObjectsExpr = maxPlusOffset;
+                }
+                LimitOperator limitCloneOp = new LimitOperator(newMaxObjectsExpr, false);
+                limitCloneOp.setSourceLocation(sourceLoc);
+                limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+                limitCloneOp.getInputs().add(new MutableObject<>(unsafeOp));
+                limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+                context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+                limitCloneOp.recomputeSchema();
+                unsafeOpRef.setValue(limitCloneOp);
+            }
+        }
+
+        context.addToDontApplySet(this, limitOp);
+
+        return true;
+    }
+
+    private boolean findSafeOpsInSubtree(ILogicalOperator candidateOp, List<LogicalVariable> limitUsedVars,
+            Collection<? super ILogicalOperator> outSafeOps, List<LogicalVariable> tmpCandidateProducedVars)
+            throws AlgebricksException {
+        ILogicalOperator safeOp = null;
+
+        while (isSafeOpCandidate(candidateOp)) {
+            tmpCandidateProducedVars.clear();
+            VariableUtilities.getProducedVariables(candidateOp, tmpCandidateProducedVars);
+            if (!OperatorPropertiesUtil.disjoint(limitUsedVars, tmpCandidateProducedVars)) {
                 break;
             }
 
-            candidateProducedVars.clear();
-            VariableUtilities.getProducedVariables(candidateOp, candidateProducedVars);
-            if (!OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
-                break;
+            List<Mutable<ILogicalOperator>> candidateOpInputs = candidateOp.getInputs();
+            if (candidateOpInputs.size() > 1) {
+                boolean foundSafeOpInBranch = false;
+                for (Mutable<ILogicalOperator> inputOpRef : candidateOpInputs) {
+                    foundSafeOpInBranch |= findSafeOpsInSubtree(inputOpRef.getValue(), limitUsedVars, outSafeOps,
+                            tmpCandidateProducedVars);
+                }
+                if (!foundSafeOpInBranch) {
+                    outSafeOps.add(candidateOp);
+                }
+                return true;
             }
 
-            safeOpRef = candidateOpRef;
-            candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+            safeOp = candidateOp;
+            candidateOp = candidateOpInputs.get(0).getValue();
         }
 
-        if (safeOpRef != null) {
-            ILogicalOperator safeOp = safeOpRef.getValue();
-            Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
-            ILogicalOperator unsafeOp = unsafeOpRef.getValue();
-            SourceLocation sourceLoc = limitOp.getSourceLocation();
-            LimitOperator limitCloneOp = null;
-            if (limitOp.getOffset().getValue() == null) {
-                limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
-                limitCloneOp.setSourceLocation(sourceLoc);
-            } else {
-                // Need to add an offset to the given limit value
-                // since the original topmost limit will use the offset value.
-                // We can't apply the offset multiple times.
-                IFunctionInfo finfoAdd =
-                        context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
-                List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
-                addArgs.add(
-                        new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
-                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
-                ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
-                maxPlusOffset.setSourceLocation(sourceLoc);
-                limitCloneOp = new LimitOperator(maxPlusOffset, false);
-                limitCloneOp.setSourceLocation(sourceLoc);
-            }
-            limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
-            limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
-            limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
-            OperatorPropertiesUtil.computeSchemaRecIfNull((AbstractLogicalOperator) unsafeOp);
-            limitCloneOp.recomputeSchema();
-            unsafeOpRef.setValue(limitCloneOp);
-            context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
-            context.addToDontApplySet(this, limitOp);
+        if (safeOp != null) {
+            outSafeOps.add(safeOp);
+            return true;
+        } else {
+            return false;
         }
+    }
 
-        return safeOpRef != null;
+    private static boolean isSafeOpCandidate(ILogicalOperator op) {
+        switch (op.getOperatorTag()) {
+            case UNIONALL:
+                return true;
+            // exclude following 'map' operators because they change cardinality
+            case SELECT:
+            case UNNEST:
+            case UNNEST_MAP:
+                return false;
+            default:
+                return op.getInputs().size() == 1 && op.isMap();
+        }
     }
 }