[ASTERIXDB-2344] LIMIT pushdown for primary index
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
-Implement LIMIT pushdown for primary index scan/lookup. In case of
a select operator, we also push the select condition to the priamry
search operator to ensure correctness.
Change-Id: I824fcad79995325e12a1a81d629160025294b915
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2541
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index beb47a8..39d33f0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
@@ -59,6 +60,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
/**
* Contributes the runtime operator for an unnest-map representing a BTree search.
@@ -120,13 +122,23 @@
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
Dataset dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ ITupleFilterFactory tupleFilterFactory = null;
+ long outputLimit = -1;
+ if (unnestMap.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+ UnnestMapOperator unnestMapOp = (UnnestMapOperator) unnestMap;
+ outputLimit = unnestMapOp.getOutputLimit();
+ if (unnestMapOp.getSelectCondition() != null) {
+ tupleFilterFactory = metadataProvider.createTupleFilterFactory(new IOperatorSchema[] { opSchema },
+ typeEnv, unnestMapOp.getSelectCondition().getValue(), context);
+ }
+ }
// By nature, LEFT_OUTER_UNNEST_MAP should generate null values for non-matching tuples.
boolean retainMissing = op.getOperatorTag() == LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP;
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
builder.getJobSpec(), opSchema, typeEnv, context, jobGenParams.getRetainInput(), retainMissing, dataset,
jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
jobGenParams.isHighKeyInclusive(), propagateFilter, minFilterFieldIndexes, maxFilterFieldIndexes,
- unnestMap.getGenerateCallBackProceedResultVar());
+ tupleFilterFactory, outputLimit, unnestMap.getGenerateCallBackProceedResultVar());
builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index cd217ab..134c96f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -63,6 +63,7 @@
import org.apache.asterix.optimizer.rules.PushFieldAccessRule;
import org.apache.asterix.optimizer.rules.PushGroupByThroughProduct;
import org.apache.asterix.optimizer.rules.PushLimitIntoOrderByRule;
+import org.apache.asterix.optimizer.rules.PushLimitIntoPrimarySearchRule;
import org.apache.asterix.optimizer.rules.PushProperJoinThroughProduct;
import org.apache.asterix.optimizer.rules.PushSimilarityFunctionsBelowJoin;
import org.apache.asterix.optimizer.rules.RemoveLeftOuterUnnestForLeftOuterJoinRule;
@@ -350,6 +351,7 @@
// We are going to apply a constant folding rule again for this case.
physicalRewritesTopLevel.add(new ConstantFoldingRule(appCtx));
physicalRewritesTopLevel.add(new PushLimitIntoOrderByRule());
+ physicalRewritesTopLevel.add(new PushLimitIntoPrimarySearchRule());
physicalRewritesTopLevel.add(new IntroduceProjectsRule());
physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectAssignRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
new file mode 100644
index 0000000..921d231
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import org.apache.asterix.optimizer.rules.am.AccessMethodUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pattern:
+ * SCAN or UNNEST_MAP -> (SELECT)? -> (EXCHANGE)? -> LIMIT
+ * We push both SELECT condition and LIMIT to SCAN or UNNEST_MAP
+ *
+ */
+public class PushLimitIntoPrimarySearchRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator op = opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
+ return false;
+ }
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
+ context.addToDontApplySet(this, op);
+
+ Long outputLimit = getOutputLimit((LimitOperator) op);
+ if (outputLimit == null) {
+ // we cannot push if limit is not constant
+ return false;
+ }
+
+ Mutable<ILogicalOperator> childOp = op.getInputs().get(0);
+ if (childOp.getValue().getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+ childOp = childOp.getValue().getInputs().get(0);
+ }
+ boolean changed = false;
+ if (childOp.getValue().getOperatorTag() == LogicalOperatorTag.SELECT) {
+ changed = rewriteSelect(childOp, outputLimit);
+ } else {
+ changed = setLimitForScanOrUnnestMap(childOp.getValue(), outputLimit);
+ }
+ if (changed) {
+ OperatorPropertiesUtil.typeOpRec(opRef, context);
+ }
+ return changed;
+ }
+
+ private Long getOutputLimit(LimitOperator limit) {
+ if (limit.getMaxObjects().getValue().getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return null;
+ }
+ long outputLimit = AccessMethodUtils.getInt64Constant(limit.getMaxObjects());
+ if (limit.getOffset() != null && limit.getOffset().getValue() != null) {
+ if (limit.getOffset().getValue().getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return null;
+ }
+ outputLimit += AccessMethodUtils.getInt64Constant(limit.getOffset());
+ }
+ return outputLimit;
+ }
+
+ private boolean rewriteSelect(Mutable<ILogicalOperator> op, long outputLimit) throws AlgebricksException {
+ SelectOperator select = (SelectOperator) op.getValue();
+ Set<LogicalVariable> selectedVariables = new HashSet<>();
+ select.getCondition().getValue().getUsedVariables(selectedVariables);
+ ILogicalOperator child = select.getInputs().get(0).getValue();
+ boolean changed = false;
+ if (child.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+ DataSourceScanOperator scan = (DataSourceScanOperator) child;
+ if (isScanPushable(scan, selectedVariables)) {
+ scan.setSelectCondition(select.getCondition());
+ scan.setOutputLimit(outputLimit);
+ changed = true;
+ }
+ } else if (child.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+ UnnestMapOperator unnestMap = (UnnestMapOperator) child;
+ if (isUnnestMapPushable(unnestMap, selectedVariables)) {
+ unnestMap.setSelectCondition(select.getCondition());
+ unnestMap.setOutputLimit(outputLimit);
+ changed = true;
+ }
+ }
+ if (changed) {
+ // SELECT is not needed
+ op.setValue(child);
+ }
+ return changed;
+ }
+
+ private boolean setLimitForScanOrUnnestMap(ILogicalOperator op, long outputLimit) throws AlgebricksException {
+ if (op.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+ DataSourceScanOperator scan = (DataSourceScanOperator) op;
+ if (isScanPushable(scan, Collections.emptySet())) {
+ scan.setOutputLimit(outputLimit);
+ return true;
+ }
+ } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+ UnnestMapOperator unnestMap = (UnnestMapOperator) op;
+ if (isUnnestMapPushable(unnestMap, Collections.emptySet())) {
+ unnestMap.setOutputLimit(outputLimit);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isUnnestMapPushable(UnnestMapOperator op, Set<LogicalVariable> selectedVariables) {
+ if (op.getOutputLimit() >= 0) {
+ // already pushed
+ return false;
+ }
+ ILogicalExpression unnestExpr = op.getExpressionRef().getValue();
+ if (op.propagatesInput() || unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+ if (!f.getFunctionIdentifier().equals(BuiltinFunctions.INDEX_SEARCH)) {
+ return false;
+ }
+ AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+ jobGenParams.readFromFuncArgs(f.getArguments());
+ if (!jobGenParams.isPrimaryIndex()) {
+ return false;
+ }
+ if (!op.getScanVariables().containsAll(selectedVariables)) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isScanPushable(DataSourceScanOperator op, Set<LogicalVariable> selectedVariables) {
+ if (op.getOutputLimit() >= 0) {
+ return false;
+ }
+ if (!op.getInputs().isEmpty()
+ && op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+ return false;
+ }
+ if (((DataSource) op.getDataSource()).getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+ return false;
+ }
+ if (!op.getScanVariables().containsAll(selectedVariables)) {
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.1.ddl.sqlpp
new file mode 100644
index 0000000..e2782c3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.1.ddl.sqlpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Test push down limit and select condition into the primary index lookup operator
+ * Expected Result : Success
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : bigint,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create index idx_LineItem_suppkey on LineItem (l_suppkey) type btree;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.2.update.sqlpp
new file mode 100644
index 0000000..546a831
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.3.query.sqlpp
new file mode 100644
index 0000000..b8eac5d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.3.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+explain select element c
+from LineItem as c
+where c.l_suppkey < 150 AND l_extendedprice < 10000
+limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.4.query.sqlpp
new file mode 100644
index 0000000..212479a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.4.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+select element c
+from LineItem as c
+where c.l_suppkey < 150 AND l_extendedprice < 10000
+limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.1.ddl.sqlpp
new file mode 100644
index 0000000..e9daaad
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.1.ddl.sqlpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Test push down limit into primary key lookup operator
+ * Expected Result : Success
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : bigint,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create index idx_LineItem_suppkey on LineItem (l_suppkey) type btree;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.2.update.sqlpp
new file mode 100644
index 0000000..546a831
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.3.query.sqlpp
new file mode 100644
index 0000000..43f7d94
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.3.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+explain select element c
+from LineItem as c
+where (c.l_suppkey < 150)
+limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.4.query.sqlpp
new file mode 100644
index 0000000..026aed7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.4.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+select element c
+from LineItem as c
+where (c.l_suppkey < 150)
+limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp
new file mode 100644
index 0000000..f214dc0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+/*
+ * Description : Test push down limit and select condition into primary index scan operator
+ * Expected Result : Success
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.DBLPType as
+{
+ id : bigint,
+ dblpid : string,
+ title : string,
+ authors : string,
+ misc : string
+};
+
+create dataset DBLP1(DBLPType) primary key id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp
new file mode 100644
index 0000000..2e0f056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+use test;
+
+
+load dataset DBLP1 using localfs ((`path`=`asterix_nc1://data/dblp-small/dblp-small-id.txt`),(`format`=`delimited-text`),(`delimiter`=`:`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.3.query.sqlpp
new file mode 100644
index 0000000..657bfb4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+use test;
+
+explain
+ select element paper
+ from DBLP1 as paper
+ where contains(dblpid, 'kimL89')
+ limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.4.query.sqlpp
new file mode 100644
index 0000000..6459b5c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.4.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+use test;
+
+select element paper
+from DBLP1 as paper
+where contains(dblpid, 'kimL89')
+limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..162cc35
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+/*
+ * Description : Test push down limit into the primary index scan operator
+ * Expected Result : Success
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.DBLPType as
+{
+ id : bigint,
+ dblpid : string,
+ title : string,
+ authors : string,
+ misc : string
+};
+
+create dataset DBLP1(DBLPType) primary key id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp
new file mode 100644
index 0000000..2e0f056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+use test;
+
+
+load dataset DBLP1 using localfs ((`path`=`asterix_nc1://data/dblp-small/dblp-small-id.txt`),(`format`=`delimited-text`),(`delimiter`=`:`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.3.query.sqlpp
new file mode 100644
index 0000000..d2cb38a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+use test;
+
+explain
+ select element paper
+ from DBLP1 as paper
+ limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.4.query.sqlpp
new file mode 100644
index 0000000..5b7c5be
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.4.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/* scan and print a delimited text file */
+
+use test;
+
+select element paper
+from DBLP1 as paper
+limit 5 offset 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.3.adm
new file mode 100644
index 0000000..9c4866f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.3.adm
@@ -0,0 +1,34 @@
+distribute result [$$c]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 5, 5
+ -- STREAM_LIMIT |UNPARTITIONED|
+ project ([$$c])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- SORT_MERGE_EXCHANGE [$$15(ASC), $$16(ASC) ] |PARTITIONED|
+ limit 10
+ -- STREAM_LIMIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ unnest-map [$$15, $$16, $$c] <- index-search("LineItem", 0, "test", "LineItem", FALSE, FALSE, 2, $$21, $$22, 2, $$21, $$22, TRUE, TRUE, TRUE) condition (and(lt($$c.getField(2), 150), lt($$c.getField(5), 10000))) limit 10
+ -- BTREE_SEARCH |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ order (ASC, $$21) (ASC, $$22)
+ -- STABLE_SORT [$$21(ASC), $$22(ASC)] |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$21, $$22])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ unnest-map [$$20, $$21, $$22] <- index-search("idx_LineItem_suppkey", 0, "test", "LineItem", FALSE, FALSE, 0, 1, $$19, TRUE, FALSE, FALSE)
+ -- BTREE_SEARCH |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ assign [$$19] <- [150]
+ -- ASSIGN |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.4.adm
new file mode 100644
index 0000000..3d466ac
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup-select/push-limit-to-primary-lookup-select.4.adm
@@ -0,0 +1,5 @@
+{ "l_orderkey": 32, "l_partkey": 3, "l_suppkey": 8, "l_linenumber": 4, "l_quantity": 4, "l_extendedprice": 3612.0, "l_discount": 0.09, "l_tax": 0.03, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1995-08-04", "l_commitdate": "1995-10-01", "l_receiptdate": "1995-09-03", "l_shipinstruct": "NONE", "l_shipmode": "REG AIR", "l_comment": "e slyly final pac" }
+{ "l_orderkey": 32, "l_partkey": 12, "l_suppkey": 6, "l_linenumber": 6, "l_quantity": 6, "l_extendedprice": 5472.06, "l_discount": 0.04, "l_tax": 0.03, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1995-07-21", "l_commitdate": "1995-09-23", "l_receiptdate": "1995-07-25", "l_shipinstruct": "COLLECT COD", "l_shipmode": "RAIL", "l_comment": " gifts cajole carefully." }
+{ "l_orderkey": 33, "l_partkey": 138, "l_suppkey": 4, "l_linenumber": 3, "l_quantity": 5, "l_extendedprice": 5190.65, "l_discount": 0.05, "l_tax": 0.03, "l_returnflag": "A", "l_linestatus": "F", "l_shipdate": "1993-12-09", "l_commitdate": "1993-12-25", "l_receiptdate": "1993-12-23", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "AIR", "l_comment": ". stealthily bold exc" }
+{ "l_orderkey": 34, "l_partkey": 170, "l_suppkey": 7, "l_linenumber": 3, "l_quantity": 6, "l_extendedprice": 6421.02, "l_discount": 0.02, "l_tax": 0.06, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1998-10-30", "l_commitdate": "1998-09-20", "l_receiptdate": "1998-11-05", "l_shipinstruct": "NONE", "l_shipmode": "FOB", "l_comment": "ar foxes sleep " }
+{ "l_orderkey": 35, "l_partkey": 121, "l_suppkey": 4, "l_linenumber": 3, "l_quantity": 7, "l_extendedprice": 7147.84, "l_discount": 0.06, "l_tax": 0.04, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-01-19", "l_commitdate": "1995-12-22", "l_receiptdate": "1996-01-29", "l_shipinstruct": "NONE", "l_shipmode": "MAIL", "l_comment": " the carefully regular " }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.3.adm
new file mode 100644
index 0000000..29357ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.3.adm
@@ -0,0 +1,34 @@
+distribute result [$$c]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 5, 5
+ -- STREAM_LIMIT |UNPARTITIONED|
+ project ([$$c])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- SORT_MERGE_EXCHANGE [$$12(ASC), $$13(ASC) ] |PARTITIONED|
+ limit 10
+ -- STREAM_LIMIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ unnest-map [$$12, $$13, $$c] <- index-search("LineItem", 0, "test", "LineItem", FALSE, FALSE, 2, $$17, $$18, 2, $$17, $$18, TRUE, TRUE, TRUE) condition (lt($$c.getField(2), 150)) limit 10
+ -- BTREE_SEARCH |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ order (ASC, $$17) (ASC, $$18)
+ -- STABLE_SORT [$$17(ASC), $$18(ASC)] |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$17, $$18])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ unnest-map [$$16, $$17, $$18] <- index-search("idx_LineItem_suppkey", 0, "test", "LineItem", FALSE, FALSE, 0, 1, $$15, TRUE, FALSE, FALSE)
+ -- BTREE_SEARCH |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ assign [$$15] <- [150]
+ -- ASSIGN |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.4.adm
new file mode 100644
index 0000000..eeedd56
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-lookup/push-limit-to-primary-lookup.4.adm
@@ -0,0 +1,5 @@
+{ "l_orderkey": 1, "l_partkey": 16, "l_suppkey": 3, "l_linenumber": 6, "l_quantity": 32, "l_extendedprice": 29312.32, "l_discount": 0.07, "l_tax": 0.02, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1996-01-30", "l_commitdate": "1996-02-07", "l_receiptdate": "1996-02-03", "l_shipinstruct": "DELIVER IN PERSON", "l_shipmode": "MAIL", "l_comment": "arefully slyly ex" }
+{ "l_orderkey": 2, "l_partkey": 107, "l_suppkey": 2, "l_linenumber": 1, "l_quantity": 38, "l_extendedprice": 38269.8, "l_discount": 0.0, "l_tax": 0.05, "l_returnflag": "N", "l_linestatus": "O", "l_shipdate": "1997-01-28", "l_commitdate": "1997-01-14", "l_receiptdate": "1997-02-02", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "RAIL", "l_comment": "ven requests. deposits breach a" }
+{ "l_orderkey": 3, "l_partkey": 5, "l_suppkey": 2, "l_linenumber": 1, "l_quantity": 45, "l_extendedprice": 40725.0, "l_discount": 0.06, "l_tax": 0.0, "l_returnflag": "R", "l_linestatus": "F", "l_shipdate": "1994-02-02", "l_commitdate": "1994-01-04", "l_receiptdate": "1994-02-23", "l_shipinstruct": "NONE", "l_shipmode": "AIR", "l_comment": "ongside of the furiously brave acco" }
+{ "l_orderkey": 3, "l_partkey": 20, "l_suppkey": 10, "l_linenumber": 2, "l_quantity": 49, "l_extendedprice": 45080.98, "l_discount": 0.1, "l_tax": 0.0, "l_returnflag": "R", "l_linestatus": "F", "l_shipdate": "1993-11-09", "l_commitdate": "1993-12-20", "l_receiptdate": "1993-11-24", "l_shipinstruct": "TAKE BACK RETURN", "l_shipmode": "RAIL", "l_comment": " unusual accounts. eve" }
+{ "l_orderkey": 3, "l_partkey": 129, "l_suppkey": 8, "l_linenumber": 3, "l_quantity": 27, "l_extendedprice": 27786.24, "l_discount": 0.06, "l_tax": 0.07, "l_returnflag": "A", "l_linestatus": "F", "l_shipdate": "1994-01-16", "l_commitdate": "1993-11-22", "l_receiptdate": "1994-01-23", "l_shipinstruct": "DELIVER IN PERSON", "l_shipmode": "SHIP", "l_comment": "nal foxes wake. " }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.3.adm
new file mode 100644
index 0000000..b60a0ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.3.adm
@@ -0,0 +1,20 @@
+distribute result [$$paper]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 5, 5
+ -- STREAM_LIMIT |UNPARTITIONED|
+ project ([$$paper])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- SORT_MERGE_EXCHANGE [$$12(ASC) ] |PARTITIONED|
+ limit 10
+ -- STREAM_LIMIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$12, $$paper] <- test.DBLP1 condition (contains($$paper.getField(1), "kimL89")) limit 10
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.4.adm
new file mode 100644
index 0000000..afc422a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.4.adm
@@ -0,0 +1,5 @@
+{ "id": 41, "dblpid": "books/aw/kimL89/EllisG89", "title": "Active Objects Ealities and Possibilities.", "authors": "Clarence A. Ellis Simon J. Gibbs", "misc": "2002-01-03 561-572 1989 Object-Oriented Concepts, Databases, and Applications db/books/collections/kim89.html#EllisG89" }
+{ "id": 42, "dblpid": "books/aw/kimL89/FishmanABCCDHHKLLMNRSW89", "title": "Overview of the Iris DBMS.", "authors": "Daniel H. Fishman Jurgen Annevelink David Beech E. C. Chow Tim Connors J. W. Davis Waqar Hasan C. G. Hoch William Kent S. Leichner Peter Lyngbæk Brom Mahbod Marie-Anne Neimat Tore Risch Ming-Chien Shan W. Kevin Wilkinson", "misc": "2002-01-03 219-250 Object-Oriented Concepts, Databases, and Applications ACM Press and Addison-Wesley 1989 db/books/collections/kim89.html#FishmanABCCDHHKLLMNRSW89" }
+{ "id": 43, "dblpid": "books/aw/kimL89/KimBCGW89", "title": "Features of the ORION Object-Oriented Database System.", "authors": "Won Kim Nat Ballou Hong-Tai Chou Jorge F. Garza Darrell Woelk", "misc": "2002-01-03 251-282 Object-Oriented Concepts, Databases, and Applications ACM Press and Addison-Wesley 1989 db/books/collections/kim89.html#KimBCGW89" }
+{ "id": 44, "dblpid": "books/aw/kimL89/KimKD89", "title": "Indexing Techniques for Object-Oriented Databases.", "authors": "Won Kim Kyung-Chang Kim Alfred G. Dale", "misc": "2002-01-03 371-394 Object-Oriented Concepts, Databases, and Applications ACM Press and Addison-Wesley 1989 db/books/collections/kim89.html#KimKD89" }
+{ "id": 45, "dblpid": "books/aw/kimL89/King89", "title": "My Cat Is Object-Oriented.", "authors": "Roger King", "misc": "2002-01-03 23-30 1989 Object-Oriented Concepts, Databases, and Applications db/books/collections/kim89.html#King89" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.3.adm
new file mode 100644
index 0000000..daeeeb5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.3.adm
@@ -0,0 +1,20 @@
+distribute result [$$paper]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 5, 5
+ -- STREAM_LIMIT |UNPARTITIONED|
+ project ([$$paper])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- SORT_MERGE_EXCHANGE [$$10(ASC) ] |PARTITIONED|
+ limit 10
+ -- STREAM_LIMIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$10, $$paper] <- test.DBLP1 limit 10
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.4.adm
new file mode 100644
index 0000000..e9f7751
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan/push-limit-to-primary-scan.4.adm
@@ -0,0 +1,5 @@
+{ "id": 6, "dblpid": "books/acm/kim95/DittrichD95", "title": "Where Object-Oriented DBMSs Should Do Better A Critique Based on Early Experiences.", "authors": "Angelika Kotz Dittrich Klaus R. Dittrich", "misc": "2002-01-03 238-254 1995 Modern Database Systems db/books/collections/kim95.html#DittrichD95" }
+{ "id": 7, "dblpid": "books/acm/kim95/Garcia-MolinaH95", "title": "Distributed Databases.", "authors": "Hector Garcia-Molina Meichun Hsu", "misc": "2002-01-03 477-493 1995 Modern Database Systems db/books/collections/kim95.html#Garcia-MolinaH95" }
+{ "id": 8, "dblpid": "books/acm/kim95/Goodman95", "title": "An Object-Oriented DBMS War Story Developing a Genome Mapping Database in C++.", "authors": "Nathan Goodman", "misc": "2002-01-03 216-237 1995 Modern Database Systems db/books/collections/kim95.html#Goodman95" }
+{ "id": 9, "dblpid": "books/acm/kim95/Kaiser95", "title": "Cooperative Transactions for Multiuser Environments.", "authors": "Gail E. Kaiser", "misc": "2002-01-03 409-433 1995 Modern Database Systems db/books/collections/kim95.html#Kaiser95" }
+{ "id": 10, "dblpid": "books/acm/kim95/KelleyGKRG95", "title": "Schema Architecture of the UniSQL/M Multidatabase System", "authors": "William Kelley Sunit K. Gala Won Kim Tom C. Reyes Bruce Graham", "misc": "2004-03-08 Modern Database Systems books/acm/Kim95 621-648 1995 db/books/collections/kim95.html#KelleyGKRG95" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 278f3f5..bd62ffe 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9863,5 +9863,27 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="limit">
+ <test-case FilePath="limit">
+ <compilation-unit name="push-limit-to-primary-scan">
+ <output-dir compare="Text">push-limit-to-primary-scan</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="limit">
+ <compilation-unit name="push-limit-to-primary-scan-select">
+ <output-dir compare="Text">push-limit-to-primary-scan-select</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="limit">
+ <compilation-unit name="push-limit-to-primary-lookup">
+ <output-dir compare="Text">push-limit-to-primary-lookup</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="limit">
+ <compilation-unit name="push-limit-to-primary-lookup-select">
+ <output-dir compare="Text">push-limit-to-primary-lookup-select</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
&GeoQueries;
</test-suite>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
index 66283d6..88449f9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public abstract class DataSource implements IDataSource<DataSourceId> {
@@ -159,7 +160,8 @@
public abstract Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException;
}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index ddab5bc..62cce05 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -21,6 +21,8 @@
import java.util.List;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
@@ -42,10 +44,11 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public class DatasetDataSource extends DataSource {
- private Dataset dataset;
+ private final Dataset dataset;
public DatasetDataSource(DataSourceId id, Dataset dataset, IAType itemType, IAType metaItemType,
byte datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain) throws AlgebricksException {
@@ -92,11 +95,16 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException {
switch (dataset.getDatasetType()) {
case EXTERNAL:
+ if (tupleFilterFactory != null || outputLimit >= 0) {
+ throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ "Tuple filter and limit are not supported by ExternalDataSource");
+ }
Dataset externalDataset = ((DatasetDataSource) dataSource).getDataset();
String itemTypeName = externalDataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
@@ -117,7 +125,8 @@
int[] maxFilterFieldIndexes = createFilterIndexes(maxFilterVars, opSchema);
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false,
((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
- true, false, minFilterFieldIndexes, maxFilterFieldIndexes, false);
+ true, false, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
+ false);
default:
throw new AlgebricksException("Unknown datasource type");
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 5c3ed56..81be1f7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -22,7 +22,8 @@
import java.util.List;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
@@ -48,6 +49,7 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public class FeedDataSource extends DataSource implements IMutationDataSource {
@@ -158,10 +160,15 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException {
try {
+ if (tupleFilterFactory != null || outputLimit >= 0) {
+ throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ "Tuple filter and limit are not supported by FeedDataSource");
+ }
ARecordType feedOutputType = (ARecordType) itemType;
ISerializerDeserializer payloadSerde =
metadataProvider.getDataFormat().getSerdeProvider().getSerializerDeserializer(feedOutputType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index d2b9871..3b5cf2e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -25,6 +25,8 @@
import java.util.Set;
import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.metadata.api.IDatasourceFunction;
import org.apache.asterix.om.types.IAType;
@@ -44,6 +46,7 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public abstract class FunctionDataSource extends DataSource {
@@ -68,9 +71,14 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException {
+ if (tupleFilterFactory != null || outputLimit >= 0) {
+ throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ "tuple filter and limit are not supported by FunctionDataSource");
+ }
GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 8fdcbbc..3460a46 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -24,6 +24,8 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
@@ -44,6 +46,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public class LoadableDataSource extends DataSource {
@@ -124,9 +127,14 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException {
+ if (tupleFilterFactory != null || outputLimit >= 0) {
+ throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ "tuple filter and limit are not supported by LoadableDataSource");
+ }
LoadableDataSource alds = (LoadableDataSource) dataSource;
ARecordType itemType = (ARecordType) alds.getLoadedType();
IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a081fb4..7f8d31d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -38,6 +37,7 @@
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.LockList;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
@@ -132,6 +132,7 @@
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -380,10 +381,12 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<DataSourceId> dataSource, List<LogicalVariable> scanVariables,
List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
- List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
- JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
+ List<LogicalVariable> maxFilterVars, ITupleFilterFactory tupleFilterFactory, long outputLimit,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec,
+ Object implConfig) throws AlgebricksException {
return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables,
- projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, implConfig);
+ projectPushed, minFilterVars, maxFilterVars, tupleFilterFactory, outputLimit, opSchema, typeEnv,
+ context, jobSpec, implConfig);
}
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
@@ -433,7 +436,8 @@
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, boolean propagateFilter, int[] minFilterFieldIndexes,
- int[] maxFilterFieldIndexes, boolean isIndexOnlyPlan) throws AlgebricksException {
+ int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
+ boolean isIndexOnlyPlan) throws AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
@@ -477,8 +481,8 @@
btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes, propagateFilter, proceedIndexOnlyPlan, failValueForIndexOnlyPlan,
- successValueForIndexOnlyPlan);
+ maxFilterFieldIndexes, propagateFilter, tupleFilterFactory, outputLimit, proceedIndexOnlyPlan,
+ failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
} else {
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
@@ -1541,7 +1545,7 @@
}
}
- private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
+ public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
throws AlgebricksException {
// No filtering condition.
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryBooleanInspector.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryBooleanInspector.java
index fd4df0d..6b33408 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryBooleanInspector.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryBooleanInspector.java
@@ -18,11 +18,14 @@
*/
package org.apache.asterix.formats.nontagged;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class BinaryBooleanInspector implements IBinaryBooleanInspector {
private static final BinaryBooleanInspector INSTANCE = new BinaryBooleanInspector();
@@ -36,11 +39,13 @@
}
};
+ private static final String NAME = "boolean-inspector";
+
private BinaryBooleanInspector() {
}
@Override
- public boolean getBooleanValue(byte[] bytes, int offset, int length) {
+ public boolean getBooleanValue(byte[] bytes, int offset, int length) throws HyracksDataException {
byte serializedTypeTag = bytes[offset];
if (serializedTypeTag == ATypeTag.SERIALIZED_MISSING_TYPE_TAG
|| serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
@@ -49,8 +54,9 @@
/** check if the runtime type is boolean */
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serializedTypeTag);
if (typeTag != ATypeTag.BOOLEAN) {
- throw new IllegalStateException("Runtime error: the select condition should be of the boolean type!");
+ throw new RuntimeDataException(ErrorCode.TYPE_MISMATCH, NAME, 0, ATypeTag.BOOLEAN, typeTag);
}
+
return bytes[offset + 1] == 1;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/AsterixTupleFilter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/AsterixTupleFilter.java
index 2712a47..96ce843 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/AsterixTupleFilter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/base/AsterixTupleFilter.java
@@ -41,7 +41,7 @@
}
@Override
- public boolean accept(IFrameTupleReference tuple) throws Exception {
+ public boolean accept(IFrameTupleReference tuple) throws HyracksDataException {
eval.evaluate(tuple, p);
return boolInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 669988c..2a97b6f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -78,6 +78,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 6860147..ae66ee2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public interface IMetadataProvider<S, I> {
public IDataSource<S> findDataSource(S id) throws AlgebricksException;
@@ -47,7 +48,8 @@
*/
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
throws AlgebricksException;
@@ -206,5 +208,9 @@
List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException;
+ public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
+ throws AlgebricksException;
+
public Map<String, String> getConfig();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index 2511fa9..6f11dc1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -35,7 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class DataSourceScanOperator extends AbstractDataSourceOperator {
- private List<LogicalVariable> projectVars;
+ private final List<LogicalVariable> projectVars;
private boolean projectPushed = false;
@@ -43,9 +43,22 @@
private List<LogicalVariable> minFilterVars;
private List<LogicalVariable> maxFilterVars;
+ // the select condition in the SELECT operator. Only results satisfying this selectCondition
+ // would be returned by this operator
+ private Mutable<ILogicalExpression> selectCondition;
+ // the maximum of number of results output by this operator
+ private long outputLimit = -1;
+
public DataSourceScanOperator(List<LogicalVariable> variables, IDataSource<?> dataSource) {
+ this(variables, dataSource, null, -1);
+ }
+
+ public DataSourceScanOperator(List<LogicalVariable> variables, IDataSource<?> dataSource,
+ Mutable<ILogicalExpression> selectCondition, long outputLimit) {
super(variables, dataSource);
projectVars = new ArrayList<LogicalVariable>();
+ this.selectCondition = selectCondition;
+ this.outputLimit = outputLimit;
}
@Override
@@ -133,4 +146,20 @@
public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
return additionalFilteringExpressions;
}
+
+ public Mutable<ILogicalExpression> getSelectCondition() {
+ return selectCondition;
+ }
+
+ public void setSelectCondition(Mutable<ILogicalExpression> selectCondition) {
+ this.selectCondition = selectCondition;
+ }
+
+ public long getOutputLimit() {
+ return outputLimit;
+ }
+
+ public void setOutputLimit(long outputLimit) {
+ this.outputLimit = outputLimit;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
index e7fb6c0..c4bcc52 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
@@ -32,9 +32,23 @@
public class UnnestMapOperator extends AbstractUnnestMapOperator {
+ // the select condition in the SELECT operator. Only results satisfying this selectCondition
+ // would be returned by this operator
+ private Mutable<ILogicalExpression> selectCondition;
+ // the maximum of number of results output by this operator
+ private long outputLimit = -1;
+
public UnnestMapOperator(List<LogicalVariable> variables, Mutable<ILogicalExpression> expression,
List<Object> variableTypes, boolean propagateInput) {
+ this(variables, expression, variableTypes, propagateInput, null, -1);
+ }
+
+ public UnnestMapOperator(List<LogicalVariable> variables, Mutable<ILogicalExpression> expression,
+ List<Object> variableTypes, boolean propagateInput, Mutable<ILogicalExpression> selectCondition,
+ long outputLimit) {
super(variables, expression, variableTypes, propagateInput);
+ this.selectCondition = selectCondition;
+ this.outputLimit = outputLimit;
}
@Override
@@ -64,4 +78,20 @@
return env;
}
+ public Mutable<ILogicalExpression> getSelectCondition() {
+ return selectCondition;
+ }
+
+ public void setSelectCondition(Mutable<ILogicalExpression> selectCondition) {
+ this.selectCondition = selectCondition;
+ }
+
+ public long getOutputLimit() {
+ return outputLimit;
+ }
+
+ public void setOutputLimit(long outputLimit) {
+ this.outputLimit = outputLimit;
+ }
+
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 74afdf5..d6062ee 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -317,8 +317,10 @@
@Override
public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, ILogicalOperator arg)
throws AlgebricksException {
- DataSourceScanOperator opCopy =
- new DataSourceScanOperator(deepCopyVariableList(op.getVariables()), op.getDataSource());
+ Mutable<ILogicalExpression> newSelectCondition = op.getSelectCondition() != null
+ ? exprDeepCopyVisitor.deepCopyExpressionReference(op.getSelectCondition()) : null;
+ DataSourceScanOperator opCopy = new DataSourceScanOperator(deepCopyVariableList(op.getVariables()),
+ op.getDataSource(), newSelectCondition, op.getOutputLimit());
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
@@ -535,9 +537,11 @@
@Override
public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg)
throws AlgebricksException {
+ Mutable<ILogicalExpression> newSelectCondition = op.getSelectCondition() != null
+ ? exprDeepCopyVisitor.deepCopyExpressionReference(op.getSelectCondition()) : null;
UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()),
exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()), op.getVariableTypes(),
- op.propagatesInput());
+ op.propagatesInput(), newSelectCondition, op.getOutputLimit());
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 600714b..0db0f74 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -35,11 +35,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -225,8 +225,10 @@
public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
ArrayList<LogicalVariable> newInputList = new ArrayList<>();
newInputList.addAll(op.getVariables());
+ Mutable<ILogicalExpression> newSelectCondition =
+ op.getSelectCondition() != null ? deepCopyExpressionRef(op.getSelectCondition()) : null;
return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
- new ArrayList<>(op.getVariableTypes()), op.propagatesInput());
+ new ArrayList<>(op.getVariableTypes()), op.propagatesInput(), newSelectCondition, op.getOutputLimit());
}
@Override
@@ -242,7 +244,11 @@
public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
ArrayList<LogicalVariable> newInputList = new ArrayList<>();
newInputList.addAll(op.getVariables());
- return new DataSourceScanOperator(newInputList, op.getDataSource());
+ Mutable<ILogicalExpression> newSelectCondition =
+ op.getSelectCondition() != null ? deepCopyExpressionRef(op.getSelectCondition()) : null;
+ DataSourceScanOperator newOp =
+ new DataSourceScanOperator(newInputList, op.getDataSource(), newSelectCondition, op.getOutputLimit());
+ return newOp;
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index cf24ee7..3587e29 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -34,11 +34,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -134,6 +134,9 @@
}
}
substVarTypes(op, pair);
+ if (op.getSelectCondition() != null) {
+ op.getSelectCondition().getValue().substituteVar(pair.first, pair.second);
+ }
return null;
}
@@ -316,6 +319,9 @@
public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
substituteVarsForAbstractUnnestMapOp(op, pair);
+ if (op.getSelectCondition() != null) {
+ op.getSelectCondition().getValue().substituteVar(pair.first, pair.second);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index b8cb4ff..d57a998 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -19,7 +19,9 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -34,11 +36,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -76,7 +78,7 @@
public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {
- private Collection<LogicalVariable> usedVariables;
+ private final Collection<LogicalVariable> usedVariables;
public UsedVariableVisitor(Collection<LogicalVariable> usedVariables) {
this.usedVariables = usedVariables;
@@ -105,6 +107,12 @@
e.getValue().getUsedVariables(usedVariables);
}
}
+ if (op.getSelectCondition() != null) {
+ Set<LogicalVariable> usedVariablesBySelect = new HashSet<>();
+ op.getSelectCondition().getValue().getUsedVariables(usedVariablesBySelect);
+ usedVariablesBySelect.removeAll(op.getVariables());
+ usedVariables.addAll(usedVariablesBySelect);
+ }
return null;
}
@@ -305,6 +313,12 @@
@Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) {
getUsedVarsFromExprAndFilterExpr(op);
+ if (op.getSelectCondition() != null) {
+ Set<LogicalVariable> usedVariablesBySelect = new HashSet<>();
+ op.getSelectCondition().getValue().getUsedVariables(usedVariablesBySelect);
+ usedVariablesBySelect.removeAll(op.getVariables());
+ usedVariables.addAll(usedVariablesBySelect);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index 1421cef..e8c5c64 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -43,11 +43,12 @@
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@SuppressWarnings("rawtypes")
public class DataSourceScanPOperator extends AbstractScanPOperator {
- private IDataSource<?> dataSource;
+ private final IDataSource<?> dataSource;
private Object implConfig;
public DataSourceScanPOperator(IDataSource<?> dataSource) {
@@ -109,9 +110,15 @@
List<LogicalVariable> vars = scan.getVariables();
List<LogicalVariable> projectVars = scan.getProjectVariables();
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p =
- mp.getScannerRuntime(dataSource, vars, projectVars, scan.isProjectPushed(), scan.getMinFilterVars(),
- scan.getMaxFilterVars(), opSchema, typeEnv, context, builder.getJobSpec(), implConfig);
+ ITupleFilterFactory tupleFilterFactory = null;
+ if (scan.getSelectCondition() != null) {
+ tupleFilterFactory = context.getMetadataProvider().createTupleFilterFactory(
+ new IOperatorSchema[] { opSchema }, typeEnv, scan.getSelectCondition().getValue(), context);
+ }
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = mp.getScannerRuntime(dataSource, vars, projectVars,
+ scan.isProjectPushed(), scan.getMinFilterVars(), scan.getMaxFilterVars(), tupleFilterFactory,
+ scan.getOutputLimit(), opSchema, typeEnv, context, builder.getJobSpec(), implConfig);
builder.contributeHyracksOperator(scan, p.first);
if (p.second != null) {
builder.contributeAlgebricksPartitionConstraint(p.first, p.second);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 70f19c1..99ed738 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -299,21 +299,25 @@
@Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
- return printAbstractUnnestMapOperator(op, indent, "unnest-map");
+ AlgebricksAppendable plan = printAbstractUnnestMapOperator(op, indent, "unnest-map");
+ appendSelectConditionInformation(plan, op.getSelectCondition(), indent);
+ appendLimitInformation(plan, op.getOutputLimit());
+ return null;
}
@Override
public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Integer indent)
throws AlgebricksException {
- return printAbstractUnnestMapOperator(op, indent, "left-outer-unnest-map");
+ printAbstractUnnestMapOperator(op, indent, "left-outer-unnest-map");
+ return null;
}
- private Void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Integer indent, String opSignature)
- throws AlgebricksException {
+ private AlgebricksAppendable printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Integer indent,
+ String opSignature) throws AlgebricksException {
AlgebricksAppendable plan = addIndent(indent).append(opSignature + " " + op.getVariables() + " <- "
+ op.getExpressionRef().getValue().accept(exprVisitor, indent));
appendFilterInformation(plan, op.getMinFilterVars(), op.getMaxFilterVars());
- return null;
+ return plan;
}
@Override
@@ -321,6 +325,24 @@
AlgebricksAppendable plan = addIndent(indent).append(
"data-scan " + op.getProjectVariables() + "<-" + op.getVariables() + " <- " + op.getDataSource());
appendFilterInformation(plan, op.getMinFilterVars(), op.getMaxFilterVars());
+ appendSelectConditionInformation(plan, op.getSelectCondition(), indent);
+ appendLimitInformation(plan, op.getOutputLimit());
+ return null;
+ }
+
+ private Void appendSelectConditionInformation(AlgebricksAppendable plan,
+ Mutable<ILogicalExpression> selectCondition, Integer indent) throws AlgebricksException {
+ if (selectCondition != null) {
+ plan.append(" condition (").append(selectCondition.getValue().accept(exprVisitor, indent)).append(")");
+ }
+
+ return null;
+ }
+
+ private Void appendLimitInformation(AlgebricksAppendable plan, long outputLimit) throws AlgebricksException {
+ if (outputLimit >= 0) {
+ plan.append(" limit ").append(String.valueOf(outputLimit));
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 1076eb5..0163a2e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -83,7 +83,7 @@
public class IdCounter {
private int id;
- private Deque<Integer> prefix;
+ private final Deque<Integer> prefix;
public IdCounter() {
prefix = new LinkedList<Integer>();
@@ -400,24 +400,28 @@
@Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
- return printAbstractUnnestMapOperator(op, indent, "unnest-map");
+ AlgebricksAppendable plan = printAbstractUnnestMapOperator(op, indent, "unnest-map");
+ appendSelectConditionInformation(plan, op.getSelectCondition(), indent);
+ appendLimitInformation(plan, op.getOutputLimit(), indent);
+ return null;
}
@Override
public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Integer indent)
throws AlgebricksException {
- return printAbstractUnnestMapOperator(op, indent, "left-outer-unnest-map");
+ printAbstractUnnestMapOperator(op, indent, "left-outer-unnest-map");
+ return null;
}
- private Void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Integer indent, String opSignature)
- throws AlgebricksException {
+ private AlgebricksAppendable printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Integer indent,
+ String opSignature) throws AlgebricksException {
AlgebricksAppendable plan = addIndent(indent).append("\"operator\": \"" + opSignature + "\"");
variablePrintHelper(op.getVariables(), indent);
buffer.append(",\n");
addIndent(indent).append("\"expressions\": \""
+ op.getExpressionRef().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
appendFilterInformation(plan, op.getMinFilterVars(), op.getMaxFilterVars(), indent);
- return null;
+ return plan;
}
@Override
@@ -435,6 +439,8 @@
addIndent(indent).append("\"data-source\": \"" + op.getDataSource() + "\"");
}
appendFilterInformation(plan, op.getMinFilterVars(), op.getMaxFilterVars(), indent);
+ appendSelectConditionInformation(plan, op.getSelectCondition(), indent);
+ appendLimitInformation(plan, op.getOutputLimit(), indent);
return null;
}
@@ -467,6 +473,25 @@
return null;
}
+ private Void appendSelectConditionInformation(AlgebricksAppendable plan, Mutable<ILogicalExpression> condition,
+ Integer indent) throws AlgebricksException {
+ if (condition != null) {
+ plan.append(",\n");
+ addIndent(indent).append(
+ "\"condition\": \"" + condition.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+ }
+ return null;
+ }
+
+ private Void appendLimitInformation(AlgebricksAppendable plan, long outputLimit, Integer indent)
+ throws AlgebricksException {
+ if (outputLimit >= 0) {
+ plan.append(",\n");
+ addIndent(indent).append("\"limit\": \"" + outputLimit + "\"");
+ }
+ return null;
+ }
+
private void appendVars(List<LogicalVariable> minFilterVars) throws AlgebricksException {
boolean first = true;
for (LogicalVariable v : minFilterVars) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 4649d6d..2cb2d35 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -289,21 +289,24 @@
@Override
public String visitUnnestMapOperator(UnnestMapOperator op, Void noArgs) throws AlgebricksException {
stringBuilder.setLength(0);
- return printAbstractUnnestMapOperator(op, "unnest-map");
+ printAbstractUnnestMapOperator(op, "unnest-map");
+ appendSelectConditionInformation(stringBuilder, op.getSelectCondition());
+ appendLimitInformation(stringBuilder, op.getOutputLimit());
+ return stringBuilder.toString();
}
@Override
public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void noArgs)
throws AlgebricksException {
stringBuilder.setLength(0);
- return printAbstractUnnestMapOperator(op, "left-outer-unnest-map");
+ printAbstractUnnestMapOperator(op, "left-outer-unnest-map");
+ return stringBuilder.toString();
}
- private String printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature) {
+ private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature) {
stringBuilder.append(opSignature).append(" ").append(op.getVariables()).append(" <- ")
.append(op.getExpressionRef().getValue().toString());
appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars());
- return stringBuilder.toString();
}
@Override
@@ -312,10 +315,12 @@
stringBuilder.append("data-scan ").append(op.getProjectVariables()).append("<-").append(op.getVariables())
.append(" <- ").append(op.getDataSource());
appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars());
+ appendSelectConditionInformation(stringBuilder, op.getSelectCondition());
+ appendLimitInformation(stringBuilder, op.getOutputLimit());
return stringBuilder.toString();
}
- private String appendFilterInformation(StringBuilder plan, List<LogicalVariable> minFilterVars,
+ private void appendFilterInformation(StringBuilder plan, List<LogicalVariable> minFilterVars,
List<LogicalVariable> maxFilterVars) {
if (minFilterVars != null || maxFilterVars != null) {
plan.append(" with filter on");
@@ -326,7 +331,21 @@
if (maxFilterVars != null) {
plan.append(" max:").append(maxFilterVars);
}
- return stringBuilder.toString();
+ }
+
+ private Void appendSelectConditionInformation(StringBuilder plan, Mutable<ILogicalExpression> condition)
+ throws AlgebricksException {
+ if (condition != null) {
+ plan.append(" condition:").append(condition.getValue().toString());
+ }
+ return null;
+ }
+
+ private Void appendLimitInformation(StringBuilder plan, long outputLimit) throws AlgebricksException {
+ if (outputLimit >= 0) {
+ plan.append(" limit:").append(String.valueOf(outputLimit));
+ }
+ return null;
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryBooleanInspector.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryBooleanInspector.java
index 24a5e21..d507e5a 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryBooleanInspector.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryBooleanInspector.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.algebricks.data;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public interface IBinaryBooleanInspector {
- public boolean getBooleanValue(byte[] bytes, int offset, int length);
+ public boolean getBooleanValue(byte[] bytes, int offset, int length) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 49eca6f..566d8e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
@@ -48,6 +49,8 @@
protected boolean appendOpCallbackProceedResult;
protected byte[] searchCallbackProceedResultFalseValue;
protected byte[] searchCallbackProceedResultTrueValue;
+ protected final ITupleFilterFactory tupleFilterFactory;
+ protected final long outputLimit;
public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -56,7 +59,7 @@
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter) {
this(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes, appendIndexFilter, false, null, null);
+ maxFilterFieldIndexes, appendIndexFilter, null, -1, false, null, null);
}
public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
@@ -64,8 +67,8 @@
IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
- boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) {
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean appendOpCallbackProceedResult,
+ byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -80,6 +83,8 @@
this.maxFilterFieldIndexes = maxFilterFieldIndexes;
this.appendIndexFilter = appendIndexFilter;
this.outRecDescs[0] = outRecDesc;
+ this.tupleFilterFactory = tupleFilterFactory;
+ this.outputLimit = outputLimit;
this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
@@ -92,7 +97,7 @@
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
- appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 6bbf437..70b88fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -50,7 +51,7 @@
throws HyracksDataException {
this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
- missingWriterFactory, searchCallbackFactory, appendIndexFilter, false, null, null);
+ missingWriterFactory, searchCallbackFactory, appendIndexFilter, null, -1, false, null, null);
}
public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
@@ -58,11 +59,12 @@
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean appendOpCallbackProceedResult,
+ byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue)
+ throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
- appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITupleFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITupleFilter.java
index 50673cf..b5dfe00 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITupleFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITupleFilter.java
@@ -19,8 +19,9 @@
package org.apache.hyracks.storage.am.common.api;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public interface ITupleFilter {
- public boolean accept(IFrameTupleReference tuple) throws Exception;
+ public boolean accept(IFrameTupleReference tuple) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 2c691f1..fb287cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -37,11 +38,14 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -95,22 +99,43 @@
protected byte[] searchCallbackProceedResultFalseValue;
protected byte[] searchCallbackProceedResultTrueValue;
+ protected final ITupleFilterFactory tupleFilterFactory;
+ protected ReferenceFrameTupleReference referenceFilterTuple;
+ // filter out tuples based on the query-provided select condition
+ // only results satisfying the filter condition would be returned to downstream operators
+ protected ITupleFilter tupleFilter;
+ protected final long outputLimit;
+ protected long outputCount = 0;
+ protected boolean finished;
+
+ // no filter and limit pushdown
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter)
throws HyracksDataException {
this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
- retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, false, null,
- null);
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, null, -1,
+ false, null, null);
}
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
+ ITupleFilterFactory tupleFilterFactory, long outputLimit) throws HyracksDataException {
+ this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
+ tupleFilterFactory, outputLimit, false, null, null);
+ }
+
+ public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
+ boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
+ ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
+ ITupleFilterFactory tupleFactoryFactory, long outputLimit, boolean appendSearchCallbackProceedResult,
+ byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue)
+ throws HyracksDataException {
this.ctx = ctx;
this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.retainInput = retainInput;
@@ -138,6 +163,12 @@
if (ctx.getStatsCollector() != null) {
ctx.getStatsCollector().add(stats);
}
+ this.tupleFilterFactory = tupleFactoryFactory;
+ this.outputLimit = outputLimit;
+
+ if (this.tupleFilterFactory != null && this.retainMissing) {
+ throw new IllegalStateException("RetainMissing with tuple filter is not supported");
+ }
}
protected abstract ISearchPredicate createSearchPredicate();
@@ -179,6 +210,13 @@
buildMissingTuple(numIndexFilterFields, nonFilterTupleBuild, nonMatchWriter);
}
+ if (tupleFilterFactory != null) {
+ tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
+ referenceFilterTuple = new ReferenceFrameTupleReference();
+ }
+ finished = false;
+ outputCount = 0;
+
try {
searchPred = createSearchPredicate();
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
@@ -201,9 +239,14 @@
protected void writeSearchResults(int tupleIndex) throws Exception {
long matchingTupleCount = 0;
while (cursor.hasNext()) {
- matchingTupleCount++;
- tb.reset();
cursor.next();
+ matchingTupleCount++;
+ ITupleReference tuple = cursor.getTuple();
+ if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
+ continue;
+ }
+ tb.reset();
+
if (retainInput) {
frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -211,7 +254,6 @@
tb.addFieldEndOffset();
}
}
- ITupleReference tuple = cursor.getTuple();
writeTupleToOutput(tuple);
if (appendSearchCallbackProceedResult) {
writeSearchCallbackProceedResult(tb,
@@ -222,6 +264,10 @@
writeFilterTupleToOutput(((ILSMIndexCursor) cursor).getFilterMaxTuple());
}
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (outputLimit >= 0 && ++outputCount >= outputLimit) {
+ finished = true;
+ break;
+ }
}
stats.getTupleCounter().update(matchingTupleCount);
@@ -237,7 +283,7 @@
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
try {
- for (int i = 0; i < tupleCount; i++) {
+ for (int i = 0; i < tupleCount && !finished; i++) {
resetSearchPredicate(i);
cursor.close();
indexAccessor.search(cursor, searchPred);
@@ -344,4 +390,50 @@
}
}
+ /**
+ * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
+ * is used by ITupleFilter
+ *
+ */
+ private static class ReferenceFrameTupleReference implements IFrameTupleReference {
+ private ITupleReference tuple;
+
+ public IFrameTupleReference reset(ITupleReference tuple) {
+ this.tuple = tuple;
+ return this;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return tuple.getFieldCount();
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return tuple.getFieldData(fIdx);
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return tuple.getFieldStart(fIdx);
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return tuple.getFieldLength(fIdx);
+ }
+
+ @Override
+ public IFrameTupleAccessor getFrameTupleAccessor() {
+ throw new UnsupportedOperationException(
+ "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
+ }
+
+ @Override
+ public int getTupleIndex() {
+ throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
+ }
+
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index 18ced6d..4dcfa94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -55,8 +55,9 @@
boolean appendIndexFilter, boolean appendOpCallbackProceedResult,
byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue)
throws HyracksDataException {
+ // TODO: predicate & limit pushdown not enabled for RTree yet
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
- retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter, null, -1,
appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue);
if (keyFields != null && keyFields.length > 0) {