[NO ISSUE][COMP] LIMIT pushdown for external scan
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Push LIMIT into external scan operator
Change-Id: I51d079f3fc190286a1c108be6a19ce7cffb7f8a1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8884
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
index b7eb8b3..22dad3a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
@@ -205,7 +205,8 @@
&& op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
return false;
}
- if (((DataSource) op.getDataSource()).getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+ byte datasourceType = ((DataSource) op.getDataSource()).getDatasourceType();
+ if (datasourceType != DataSource.Type.INTERNAL_DATASET && datasourceType != DataSource.Type.EXTERNAL_DATASET) {
return false;
}
if (!op.getScanVariables().containsAll(selectedVariables)) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
new file mode 100644
index 0000000..5dcc87b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE EXTERNAL DATASET ds1(f1 INTEGER, f2 BOOLEAN, f3 STRING)
+ USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"), ("null"=""));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
new file mode 100644
index 0000000..8a71219
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE f1
+FROM ds1 t
+WHERE f1 > 2
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
new file mode 100644
index 0000000..fe7ffbf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE f1
+FROM ds1 t
+WHERE f1 > 2
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..5dcc87b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE EXTERNAL DATASET ds1(f1 INTEGER, f2 BOOLEAN, f3 STRING)
+ USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"), ("null"=""));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
new file mode 100644
index 0000000..cf98178
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE f1
+FROM ds1 t
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
new file mode 100644
index 0000000..0c52414
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE f1
+FROM ds1 t
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm
new file mode 100644
index 0000000..b38ed8b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm
@@ -0,0 +1,22 @@
+distribute result [$$16]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 5
+ -- STREAM_LIMIT |UNPARTITIONED|
+ exchange
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ limit 5
+ -- STREAM_LIMIT |PARTITIONED|
+ project ([$$16])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$16] <- [$$t.getField(0)]
+ -- ASSIGN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$t] <- test.ds1 condition (gt($$t.getField(0), 2)) limit 5
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm
new file mode 100644
index 0000000..2288987
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm
@@ -0,0 +1,5 @@
+3
+4
+5
+6
+7
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm
new file mode 100644
index 0000000..2c98237
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm
@@ -0,0 +1,22 @@
+distribute result [$$13]
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ limit 5
+ -- STREAM_LIMIT |UNPARTITIONED|
+ exchange
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ project ([$$13])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$13] <- [$$t.getField(0)]
+ -- ASSIGN |PARTITIONED|
+ limit 5
+ -- STREAM_LIMIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$t] <- test.ds1 limit 5
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm
new file mode 100644
index 0000000..85954ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm
@@ -0,0 +1,5 @@
+1
+2
+3
+4
+5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 63912d6..a99b121 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -13944,6 +13944,16 @@
</compilation-unit>
</test-case>
<test-case FilePath="limit">
+ <compilation-unit name="push-limit-to-external-scan">
+ <output-dir compare="Text">push-limit-to-external-scan</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="limit">
+ <compilation-unit name="push-limit-to-external-scan-select">
+ <output-dir compare="Text">push-limit-to-external-scan-select</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="limit">
<compilation-unit name="push-limit-to-primary-scan">
<output-dir compare="Text">push-limit-to-primary-scan</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
index 8fc70b8..143b805 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
/**
* A super interface implemented by a data source adapter. An adapter can be a
@@ -46,9 +47,33 @@
* write frame to. Adapter packs the fetched bytes (from external source),
* packs them into frames and forwards the frames to an upstream receiving
* operator using the instance of IFrameWriter.
+ * @param tupleFilter
+ * If not {@code null} then only tuple matching this filter should be emitted
+ * @param outputLimit
+ * Limits the number of tuples that should be emitted
+ * (after applying the filter if it's specified).
+ * {@code -1} if there is not limit.
* @throws Exception
*/
- public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException;
+ void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+ throws HyracksDataException, InterruptedException;
+
+ /**
+ * Triggers the adapter to begin ingesting data from the external source.
+ *
+ * @param partition
+ * The adapter could be running with a degree of parallelism.
+ * partition corresponds to the i'th parallel instance.
+ * @param writer
+ * The instance of frame writer that is used by the adapter to
+ * write frame to. Adapter packs the fetched bytes (from external source),
+ * packs them into frames and forwards the frames to an upstream receiving
+ * operator using the instance of IFrameWriter.
+ * @throws Exception
+ */
+ default void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
+ start(partition, writer, null, -1);
+ }
/**
* @return The number of processed tuples by this adapter
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index ccc420b..527e342 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -22,11 +22,13 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
@FunctionalInterface
public interface IDataFlowController {
- public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException;
+ public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+ throws HyracksDataException, InterruptedException;
public default boolean pause() throws HyracksDataException {
throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 7a089b8..efb0f13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -67,7 +68,11 @@
}
@Override
- public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException {
+ public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+ throws HyracksDataException, InterruptedException {
+ if (tupleFilter != null || outputLimit >= 0) {
+ throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+ }
synchronized (this) {
if (state == State.STOPPED) {
return;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 4deb422..b42e6de 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -18,12 +18,15 @@
*/
package org.apache.asterix.external.dataflow;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
public class FeedStreamDataFlowController extends AbstractFeedDataFlowController {
@@ -39,7 +42,10 @@
}
@Override
- public void start(IFrameWriter writer) throws HyracksDataException {
+ public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
+ if (tupleFilter != null || outputLimit >= 0) {
+ throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+ }
try {
tupleForwarder = new TupleForwarder(ctx, writer);
while (true) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 2c19f9d..ce49ccf8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -26,6 +26,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
public class RecordDataFlowController<T> extends AbstractDataFlowController {
@@ -42,17 +45,29 @@
}
@Override
- public void start(IFrameWriter writer) throws HyracksDataException {
+ public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
try {
processedTuples = 0;
ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
+ boolean tupleFilterExists = tupleFilter != null;
+ ArrayTupleReference tupleRef = tupleFilterExists ? new ArrayTupleReference() : null;
+ ReferenceFrameTupleReference frameTupleRef = tupleFilterExists ? new ReferenceFrameTupleReference() : null;
TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
- while (recordReader.hasNext()) {
+ while ((outputLimit < 0 || processedTuples < outputLimit) && recordReader.hasNext()) {
IRawRecord<? extends T> record = recordReader.next();
tb.reset();
if (dataParser.parse(record, tb.getDataOutput())) {
tb.addFieldEndOffset();
appendOtherTupleFields(tb);
+
+ if (tupleFilterExists) {
+ tupleRef.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+ frameTupleRef.reset(tupleRef);
+ if (!tupleFilter.accept(frameTupleRef)) {
+ continue;
+ }
+ }
+
tupleForwarder.addTuple(tb);
processedTuples++;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 9c11c97..85320e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -23,6 +23,9 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
public class StreamDataFlowController extends AbstractDataFlowController {
private final IStreamDataParser dataParser;
@@ -33,17 +36,29 @@
}
@Override
- public void start(IFrameWriter writer) throws HyracksDataException {
+ public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
try {
processedTuples = 0;
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ boolean tupleFilterExists = tupleFilter != null;
+ ArrayTupleReference tupleRef = tupleFilterExists ? new ArrayTupleReference() : null;
+ ReferenceFrameTupleReference frameTupleRef = tupleFilterExists ? new ReferenceFrameTupleReference() : null;
TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
- while (true) {
+ while (outputLimit < 0 || processedTuples < outputLimit) {
tb.reset();
if (!dataParser.parse(tb.getDataOutput())) {
break;
}
tb.addFieldEndOffset();
+
+ if (tupleFilterExists) {
+ tupleRef.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+ frameTupleRef.reset(tupleRef);
+ if (!tupleFilter.accept(frameTupleRef)) {
+ continue;
+ }
+ }
+
tupleForwarder.addTuple(tb);
processedTuples++;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 123a552..54e633a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -25,6 +25,7 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
public class FeedAdapter implements IDataSourceAdapter, Closeable {
private final AbstractFeedDataFlowController controller;
@@ -34,8 +35,9 @@
}
@Override
- public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
- controller.start(writer);
+ public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+ throws HyracksDataException, InterruptedException {
+ controller.start(writer, tupleFilter, outputLimit);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 17a134b..9e75d69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -22,6 +22,7 @@
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
public class GenericAdapter implements IDataSourceAdapter {
@@ -32,8 +33,9 @@
}
@Override
- public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
- controller.start(writer);
+ public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+ throws HyracksDataException, InterruptedException {
+ controller.start(writer, tupleFilter, outputLimit);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 1d7623d..82b8113 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -29,6 +29,8 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
/*
* A single activity operator that provides the functionality of scanning data using an
@@ -36,21 +38,31 @@
*/
public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- private ITypedAdapterFactory adapterFactory;
+ private final ITypedAdapterFactory adapterFactory;
+
+ private final ITupleFilterFactory tupleFilterFactory;
+
+ private final long outputLimit;
public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
- ITypedAdapterFactory dataSourceAdapterFactory) {
+ ITypedAdapterFactory dataSourceAdapterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit) {
super(spec, 0, 1);
outRecDescs[0] = rDesc;
this.adapterFactory = dataSourceAdapterFactory;
+ this.tupleFilterFactory = tupleFilterFactory;
+ this.outputLimit = outputLimit;
+ }
+
+ public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
+ ITypedAdapterFactory dataSourceAdapterFactory) {
+ this(spec, rDesc, dataSourceAdapterFactory, null, -1);
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -64,8 +76,10 @@
}
try {
writer.open();
+ ITupleFilter tupleFilter =
+ tupleFilterFactory != null ? tupleFilterFactory.createTupleFilter(ctx) : null;
adapter = adapterFactory.createAdapter(ctx, partition);
- adapter.start(partition, writer);
+ adapter.start(partition, writer, tupleFilter, outputLimit);
if (stats != null) {
stats.getTupleCounter().update(adapter.getProcessedTuples());
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index effd59f..314cd20 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -26,6 +26,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
@@ -34,6 +36,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.ITupleParser;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -64,7 +67,12 @@
}
@Override
- public void start(int partition, IFrameWriter writer) throws HyracksDataException {
+ public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+ throws HyracksDataException {
+ if (tupleFilter != null || outputLimit >= 0) {
+ throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+ }
+
generator = new DummyGenerator(configuration, pos);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(generator);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
index 708cdd8..14e70a9 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
@@ -21,6 +21,7 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
@@ -33,7 +34,7 @@
}
@Override
- public void start(IFrameWriter writer) {
+ public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) {
throw new UnsupportedOperationException();
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 2ac8bf9..29a66b9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -23,8 +23,6 @@
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
@@ -105,10 +103,6 @@
IProjectionInfo<?> projectionInfo) throws AlgebricksException {
switch (dataset.getDatasetType()) {
case EXTERNAL:
- if (tupleFilterFactory != null || outputLimit >= 0) {
- throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
- "Tuple filter and limit are not supported by ExternalDataSource");
- }
Dataset externalDataset = ((DatasetDataSource) dataSource).getDataset();
String itemTypeName = externalDataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
@@ -118,7 +112,8 @@
Map<String, String> properties = addProjectionInfo(projectionInfo, edd.getProperties());
ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector());
- return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+ return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+ tupleFilterFactory, outputLimit);
case INTERNAL:
DataSourceId id = getId();
DataverseName dataverseName = id.getDataverseName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index d415f73..f5fd7dd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -25,8 +25,6 @@
import java.util.Set;
import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.metadata.api.IDatasourceFunction;
@@ -86,17 +84,14 @@
ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
IProjectionInfo<?> projectionInfo) throws AlgebricksException {
- if (tupleFilterFactory != null || outputLimit >= 0) {
- throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
- "tuple filter and limit are not supported by FunctionDataSource");
- }
GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
FunctionDataSourceFactory factory =
new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
adapterFactory.configure(factory);
- return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+ return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+ tupleFilterFactory, outputLimit);
}
protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index dd8b3b4..e5fd6f7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -995,7 +995,8 @@
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
- JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
+ JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new AlgebricksException("Can only scan datasets of records.");
}
@@ -1004,8 +1005,8 @@
getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- ExternalScanOperatorDescriptor dataScanner =
- new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory);
+ ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, scannerDesc,
+ adapterFactory, tupleFilterFactory, outputLimit);
AlgebricksPartitionConstraint constraint;
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index fae0d75..a5bc206 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -37,7 +36,6 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -48,6 +46,7 @@
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -233,8 +232,11 @@
cursor.next();
matchingTupleCount++;
ITupleReference tuple = cursor.getTuple();
- if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
- continue;
+ if (tupleFilter != null) {
+ referenceFilterTuple.reset(tuple);
+ if (!tupleFilter.accept(referenceFilterTuple)) {
+ continue;
+ }
}
tb.reset();
@@ -388,52 +390,6 @@
}
}
- /**
- * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
- * is used by ITupleFilter
- *
- */
- protected static class ReferenceFrameTupleReference implements IFrameTupleReference {
- private ITupleReference tuple;
-
- public IFrameTupleReference reset(ITupleReference tuple) {
- this.tuple = tuple;
- return this;
- }
-
- @Override
- public int getFieldCount() {
- return tuple.getFieldCount();
- }
-
- @Override
- public byte[] getFieldData(int fIdx) {
- return tuple.getFieldData(fIdx);
- }
-
- @Override
- public int getFieldStart(int fIdx) {
- return tuple.getFieldStart(fIdx);
- }
-
- @Override
- public int getFieldLength(int fIdx) {
- return tuple.getFieldLength(fIdx);
- }
-
- @Override
- public IFrameTupleAccessor getFrameTupleAccessor() {
- throw new UnsupportedOperationException(
- "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
- }
-
- @Override
- public int getTupleIndex() {
- throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
- }
-
- }
-
@Override
public String getDisplayName() {
return "Index Search";
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java
new file mode 100644
index 0000000..f7d8169
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.common.tuples;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
+ * is used by ITupleFilter
+ */
+public final class ReferenceFrameTupleReference implements IFrameTupleReference {
+
+ private ITupleReference tuple;
+
+ public void reset(ITupleReference tuple) {
+ this.tuple = tuple;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return tuple.getFieldCount();
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return tuple.getFieldData(fIdx);
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return tuple.getFieldStart(fIdx);
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return tuple.getFieldLength(fIdx);
+ }
+
+ @Override
+ public IFrameTupleAccessor getFrameTupleAccessor() {
+ throw new UnsupportedOperationException(
+ "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
+ }
+
+ @Override
+ public int getTupleIndex() {
+ throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 596f4b0..dddaab1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -94,8 +94,11 @@
cursor.next();
matchingTupleCount++;
ITupleReference tuple = cursor.getTuple();
- if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
- continue;
+ if (tupleFilter != null) {
+ referenceFilterTuple.reset(tuple);
+ if (!tupleFilter.accept(referenceFilterTuple)) {
+ continue;
+ }
}
tb.reset();