ASTERIXDB-1157, ASTERIXDB-1051: Pushdown limit

- Limit Pushdown into an Order (ExternalSort) operator.
- CopyLimitDownRule doesn't copy LIMIT through UNNESTMAP operator.

Change-Id: I49fb5f38fe8eb4b4419e596a03e2187939d9fd2e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/616
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java
index 69e2582..e55eef3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -96,17 +95,26 @@
     };
 
     private final List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+    // In case we can push down LIMIT information into this operator.
+    protected final int topK;
 
     // These are pairs of type (comparison, expr) where comparison is
     // ASC or DESC or a boolean function of arity 2 that can take as
     // arguments results of expr.
 
+    // TopK : -1 means there is no LIMIT push-down optimization applied to this operator.
     public OrderOperator() {
         orderExpressions = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+        topK = -1;
     }
 
     public OrderOperator(List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExpressions) {
+        this(orderExpressions, -1);
+    }
+
+    public OrderOperator(List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExpressions, int topK) {
         this.orderExpressions = orderExpressions;
+        this.topK = topK;
     }
 
     @Override
@@ -160,4 +168,8 @@
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
+
+    public int getTopK() {
+        return topK;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 164bc17..120c1c4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
 
 /**
  * This will always be attached to an {@link OrderOperator} logical operator.
@@ -46,10 +47,16 @@
 public class StableSortPOperator extends AbstractStableSortPOperator {
 
     private int maxNumberOfFrames;
+    private int topK;
 
     public StableSortPOperator(int maxNumberOfFrames) {
+        this(maxNumberOfFrames, -1);
+    }
+
+    public StableSortPOperator(int maxNumberOfFrames, int topK) {
         super();
         this.maxNumberOfFrames = maxNumberOfFrames;
+        this.topK = topK;
     }
 
     @Override
@@ -65,9 +72,10 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
         int n = sortColumns.length;
         int[] sortFields = new int[n];
         IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
@@ -90,10 +98,41 @@
             i++;
         }
 
-        ExternalSortOperatorDescriptor sortOpDesc = new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames,
-                sortFields, nkcf, comps, recDescriptor);
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
-        ILogicalOperator src = op.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, op, 0);
+        // topK == -1 means that a topK value is not provided.
+        if (topK == -1) {
+            ExternalSortOperatorDescriptor sortOpDesc = new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames,
+                    sortFields, nkcf, comps, recDescriptor);
+            contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
+            ILogicalOperator src = op.getInputs().get(0).getValue();
+            builder.contributeGraphEdge(src, 0, op, 0);
+        } else {
+            // Since topK value is provided, topK optimization is possible.
+            // We call topKSorter instead of calling ExternalSortOperator.
+            TopKSorterOperatorDescriptor sortOpDesc = new TopKSorterOperatorDescriptor(spec, maxNumberOfFrames, topK,
+                    sortFields, nkcf, comps, recDescriptor);
+            contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
+            ILogicalOperator src = op.getInputs().get(0).getValue();
+            builder.contributeGraphEdge(src, 0, op, 0);
+        }
     }
+
+    @Override
+    public String toString() {
+        if (orderProp == null) {
+            if (topK != -1) {
+                // A topK value is introduced.
+                return getOperatorTag().toString() + " [topK: " + topK + "]";
+            } else {
+                return getOperatorTag().toString();
+            }
+        } else {
+            if (topK != -1) {
+                // A topK value is introduced.
+                return getOperatorTag().toString() + " [topK: " + topK + "]" + " " + orderProp;
+            } else {
+                return getOperatorTag().toString() + " " + orderProp;
+            }
+        }
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index cf0f1c2..e310ff9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -151,6 +151,11 @@
         addIndent(buffer, indent).append("order ");
         for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
             String fst;
+
+            if (op.getTopK() != -1) {
+                buffer.append("(topK: " + op.getTopK() + ") ");
+            }
+
             switch (p.first.getKind()) {
                 case ASC: {
                     fst = "ASC";
@@ -165,6 +170,7 @@
                 }
             }
             buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+
         }
         return buffer.toString();
     }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
index 0783f0c..a68ed6c 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -23,7 +23,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -48,7 +47,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
             return false;
@@ -71,6 +71,7 @@
             LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
             if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
                     || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT
+                    || candidateOpTag == LogicalOperatorTag.UNNEST_MAP
                     || !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
                 break;
             }
@@ -87,10 +88,14 @@
             if (limitOp.getOffset().getValue() == null) {
                 limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
             } else {
-                IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
-                        AlgebricksBuiltinFunctions.NUMERIC_ADD);
+                // Need to add an offset to the given limit value
+                // since the original topmost limit will use the offset value.
+                // We can't apply the offset multiple times.
+                IFunctionInfo finfoAdd = context.getMetadataProvider()
+                        .lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
                 List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
-                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
+                addArgs.add(
+                        new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
                 addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
                 ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
                 limitCloneOp = new LimitOperator(maxPlusOffset, false);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index d85ffe9..e99d126 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -214,8 +214,8 @@
                         }
                     }
                     if (topLevelOp) {
-                        op.setPhysicalOperator(
-                                new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+                        op.setPhysicalOperator(new StableSortPOperator(
+                                physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
                     } else {
                         op.setPhysicalOperator(new InMemoryStableSortPOperator());
                     }