ASTERIXDB-1157, ASTERIXDB-1051: Pushdown limit
- Limit Pushdown into an Order (ExternalSort) operator.
- CopyLimitDownRule doesn't copy LIMIT through UNNESTMAP operator.
Change-Id: I49fb5f38fe8eb4b4419e596a03e2187939d9fd2e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/616
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java
index 69e2582..e55eef3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OrderOperator.java
@@ -22,7 +22,6 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -96,17 +95,26 @@
};
private final List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+ // In case we can push down LIMIT information into this operator.
+ protected final int topK;
// These are pairs of type (comparison, expr) where comparison is
// ASC or DESC or a boolean function of arity 2 that can take as
// arguments results of expr.
+ // TopK : -1 means there is no LIMIT push-down optimization applied to this operator.
public OrderOperator() {
orderExpressions = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+ topK = -1;
}
public OrderOperator(List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExpressions) {
+ this(orderExpressions, -1);
+ }
+
+ public OrderOperator(List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExpressions, int topK) {
this.orderExpressions = orderExpressions;
+ this.topK = topK;
}
@Override
@@ -160,4 +168,8 @@
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
}
+
+ public int getTopK() {
+ return topK;
+ }
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 164bc17..120c1c4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
/**
* This will always be attached to an {@link OrderOperator} logical operator.
@@ -46,10 +47,16 @@
public class StableSortPOperator extends AbstractStableSortPOperator {
private int maxNumberOfFrames;
+ private int topK;
public StableSortPOperator(int maxNumberOfFrames) {
+ this(maxNumberOfFrames, -1);
+ }
+
+ public StableSortPOperator(int maxNumberOfFrames, int topK) {
super();
this.maxNumberOfFrames = maxNumberOfFrames;
+ this.topK = topK;
}
@Override
@@ -65,9 +72,10 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
int n = sortColumns.length;
int[] sortFields = new int[n];
IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
@@ -90,10 +98,41 @@
i++;
}
- ExternalSortOperatorDescriptor sortOpDesc = new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames,
- sortFields, nkcf, comps, recDescriptor);
- contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
- ILogicalOperator src = op.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, op, 0);
+ // topK == -1 means that a topK value is not provided.
+ if (topK == -1) {
+ ExternalSortOperatorDescriptor sortOpDesc = new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames,
+ sortFields, nkcf, comps, recDescriptor);
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ } else {
+ // Since topK value is provided, topK optimization is possible.
+ // We call topKSorter instead of calling ExternalSortOperator.
+ TopKSorterOperatorDescriptor sortOpDesc = new TopKSorterOperatorDescriptor(spec, maxNumberOfFrames, topK,
+ sortFields, nkcf, comps, recDescriptor);
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
}
+
+ @Override
+ public String toString() {
+ if (orderProp == null) {
+ if (topK != -1) {
+ // A topK value is introduced.
+ return getOperatorTag().toString() + " [topK: " + topK + "]";
+ } else {
+ return getOperatorTag().toString();
+ }
+ } else {
+ if (topK != -1) {
+ // A topK value is introduced.
+ return getOperatorTag().toString() + " [topK: " + topK + "]" + " " + orderProp;
+ } else {
+ return getOperatorTag().toString() + " " + orderProp;
+ }
+ }
+ }
+
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index cf0f1c2..e310ff9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -151,6 +151,11 @@
addIndent(buffer, indent).append("order ");
for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
String fst;
+
+ if (op.getTopK() != -1) {
+ buffer.append("(topK: " + op.getTopK() + ") ");
+ }
+
switch (p.first.getKind()) {
case ASC: {
fst = "ASC";
@@ -165,6 +170,7 @@
}
}
buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+
}
return buffer.toString();
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
index 0783f0c..a68ed6c 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -23,7 +23,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -48,7 +47,8 @@
}
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
return false;
@@ -71,6 +71,7 @@
LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
|| candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT
+ || candidateOpTag == LogicalOperatorTag.UNNEST_MAP
|| !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
break;
}
@@ -87,10 +88,14 @@
if (limitOp.getOffset().getValue() == null) {
limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
} else {
- IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
- AlgebricksBuiltinFunctions.NUMERIC_ADD);
+ // Need to add an offset to the given limit value
+ // since the original topmost limit will use the offset value.
+ // We can't apply the offset multiple times.
+ IFunctionInfo finfoAdd = context.getMetadataProvider()
+ .lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
- addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
+ addArgs.add(
+ new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
limitCloneOp = new LimitOperator(maxPlusOffset, false);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index d85ffe9..e99d126 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -214,8 +214,8 @@
}
}
if (topLevelOp) {
- op.setPhysicalOperator(
- new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+ op.setPhysicalOperator(new StableSortPOperator(
+ physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
} else {
op.setPhysicalOperator(new InMemoryStableSortPOperator());
}