ensure limits are copied down as far as possible and not through select operators
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index d786cff..d91cac0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -21,6 +21,7 @@
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -34,6 +35,8 @@
 
     public LogicalOperatorTag getOperatorTag();
 
+    public ExecutionMode getExecutionMode();
+
     public List<Mutable<ILogicalOperator>> getInputs();
 
     boolean hasInputs();
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
index 767f458..1c2cfae 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.CopyLimitDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectIntoDataSourceScanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
@@ -106,13 +106,13 @@
         physicalPlanRewrites.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalPlanRewrites.add(new EnforceStructuralPropertiesRule());
         physicalPlanRewrites.add(new PushProjectDownRule());
-        physicalPlanRewrites.add(new PushLimitDownRule());
+        physicalPlanRewrites.add(new CopyLimitDownRule());
         return physicalPlanRewrites;
     }
 
     public final static List<IAlgebraicRewriteRule> buildPhysicalRewritesTopLevelRuleCollection() {
         List<IAlgebraicRewriteRule> physicalPlanRewrites = new LinkedList<IAlgebraicRewriteRule>();
-        physicalPlanRewrites.add(new PushLimitDownRule());
+        physicalPlanRewrites.add(new CopyLimitDownRule());
         return physicalPlanRewrites;
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
new file mode 100644
index 0000000..2f080fb
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CopyLimitDownRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
+            return false;
+        }
+        LimitOperator limitOp = (LimitOperator) op;
+        if (!limitOp.isTopmostLimitOp()) {
+            return false;
+        }
+
+        List<LogicalVariable> limitUsedVars = new ArrayList<>();
+        VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
+
+        Mutable<ILogicalOperator> safeOpRef = null;
+        Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+
+        List<LogicalVariable> candidateProducedVars = new ArrayList<>();
+        while (true) {
+            candidateProducedVars.clear();
+            ILogicalOperator candidateOp = candidateOpRef.getValue();
+            LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
+            if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
+                    || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT
+                    || !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
+                break;
+            }
+
+            safeOpRef = candidateOpRef;
+            candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+        }
+
+        if (safeOpRef != null) {
+            ILogicalOperator safeOp = safeOpRef.getValue();
+            Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
+            ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+            LimitOperator limitCloneOp = null;
+            if (limitOp.getOffset().getValue() == null) {
+                limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
+            } else {
+                IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
+                        AlgebricksBuiltinFunctions.NUMERIC_ADD);
+                List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
+                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
+                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
+                ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+                limitCloneOp = new LimitOperator(maxPlusOffset, false);
+            }
+            limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+            limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
+            limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+            limitCloneOp.recomputeSchema();
+            unsafeOpRef.setValue(limitCloneOp);
+            context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+            context.addToDontApplySet(this, limitOp);
+        }
+
+        return safeOpRef != null;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
deleted file mode 100644
index a0bebaf..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.LinkedList;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class PushLimitDownRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
-        return false;
-    }
-
-    /**
-     * When a global Limit over a merge-exchange is found, a local Limit is
-     * pushed down.
-     */
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
-            return false;
-        }
-        LimitOperator opLim = (LimitOperator) op;
-        if (!opLim.isTopmostLimitOp()) {
-            return false;
-        }
-
-        Mutable<ILogicalOperator> opRef2 = opLim.getInputs().get(0);
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
-
-        if (context.checkAndAddToAlreadyCompared(op, op2)) {
-            return false;
-        }
-        if (op2.getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
-            return false;
-        }
-        PhysicalOperatorTag op2PTag = op2.getPhysicalOperator().getOperatorTag();
-        // we should test for any kind of merge
-        if (op2PTag != PhysicalOperatorTag.RANDOM_MERGE_EXCHANGE && op2PTag != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
-            return false;
-        }
-
-        LinkedList<LogicalVariable> usedVars1 = new LinkedList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(opLim, usedVars1);
-
-        do {
-            if (op2.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE
-                    || op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE
-                    || op2.getOperatorTag() == LogicalOperatorTag.LIMIT) {
-                return false;
-            }
-            if (op2.getInputs().size() > 1 || !op2.isMap()) {
-                break;
-            }
-            LinkedList<LogicalVariable> vars2 = new LinkedList<LogicalVariable>();
-            VariableUtilities.getProducedVariables(op2, vars2);
-            if (!OperatorPropertiesUtil.disjoint(vars2, usedVars1)) {
-                return false;
-            }
-            // we assume pipelineable ops. have only one input
-            opRef2 = op2.getInputs().get(0);
-            op2 = (AbstractLogicalOperator) opRef2.getValue();
-        } while (true);
-
-        LimitOperator clone2 = null;
-        if (opLim.getOffset().getValue() == null) {
-            clone2 = new LimitOperator(opLim.getMaxObjects().getValue(), false);
-        } else {
-            // push limit (max+offset)
-            IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
-                    AlgebricksBuiltinFunctions.NUMERIC_ADD);
-            ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd,
-                    opLim.getMaxObjects(), opLim.getOffset());
-            clone2 = new LimitOperator(maxPlusOffset, false);
-        }
-        clone2.setPhysicalOperator(new StreamLimitPOperator());
-        clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
-        clone2.setExecutionMode(op2.getExecutionMode());
-        clone2.recomputeSchema();
-        opRef2.setValue(clone2);
-        context.computeAndSetTypeEnvironmentForOperator(clone2);
-        return true;
-    }
-
-}
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 12b5986..1f31e44 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -38,7 +38,7 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.CopyLimitDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectIntoDataSourceScanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
@@ -107,7 +107,7 @@
         PHYSICAL_PLAN_REWRITES.add(new EnforceStructuralPropertiesRule());
         PHYSICAL_PLAN_REWRITES.add(new PushProjectDownRule());
         PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());
-        PHYSICAL_PLAN_REWRITES.add(new PushLimitDownRule());
+        PHYSICAL_PLAN_REWRITES.add(new CopyLimitDownRule());
         PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeWriteRule());
         PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeUnionRule());
     }