[NO ISSUE][COMP] LIMIT pushdown for external scan

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Push LIMIT into external scan operator

Change-Id: I51d079f3fc190286a1c108be6a19ce7cffb7f8a1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8884
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
index b7eb8b3..22dad3a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
@@ -205,7 +205,8 @@
                 && op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
             return false;
         }
-        if (((DataSource) op.getDataSource()).getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+        byte datasourceType = ((DataSource) op.getDataSource()).getDatasourceType();
+        if (datasourceType != DataSource.Type.INTERNAL_DATASET && datasourceType != DataSource.Type.EXTERNAL_DATASET) {
             return false;
         }
         if (!op.getScanVariables().containsAll(selectedVariables)) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
new file mode 100644
index 0000000..5dcc87b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE EXTERNAL DATASET ds1(f1 INTEGER, f2 BOOLEAN, f3 STRING)
+  USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"), ("null"=""));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
new file mode 100644
index 0000000..8a71219
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE f1
+FROM ds1 t
+WHERE f1 > 2
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
new file mode 100644
index 0000000..fe7ffbf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE f1
+FROM ds1 t
+WHERE f1 > 2
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..5dcc87b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE EXTERNAL DATASET ds1(f1 INTEGER, f2 BOOLEAN, f3 STRING)
+  USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"), ("null"=""));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
new file mode 100644
index 0000000..cf98178
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+EXPLAIN
+SELECT VALUE f1
+FROM ds1 t
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
new file mode 100644
index 0000000..0c52414
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE f1
+FROM ds1 t
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm
new file mode 100644
index 0000000..b38ed8b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm
@@ -0,0 +1,22 @@
+distribute result [$$16]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 5
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        limit 5
+        -- STREAM_LIMIT  |PARTITIONED|
+          project ([$$16])
+          -- STREAM_PROJECT  |PARTITIONED|
+            assign [$$16] <- [$$t.getField(0)]
+            -- ASSIGN  |PARTITIONED|
+              exchange
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$t] <- test.ds1 condition (gt($$t.getField(0), 2)) limit 5
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm
new file mode 100644
index 0000000..2288987
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm
@@ -0,0 +1,5 @@
+3
+4
+5
+6
+7
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm
new file mode 100644
index 0000000..2c98237
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm
@@ -0,0 +1,22 @@
+distribute result [$$13]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 5
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        project ([$$13])
+        -- STREAM_PROJECT  |PARTITIONED|
+          assign [$$13] <- [$$t.getField(0)]
+          -- ASSIGN  |PARTITIONED|
+            limit 5
+            -- STREAM_LIMIT  |PARTITIONED|
+              exchange
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$t] <- test.ds1 limit 5
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm
new file mode 100644
index 0000000..85954ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm
@@ -0,0 +1,5 @@
+1
+2
+3
+4
+5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 63912d6..a99b121 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -13944,6 +13944,16 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="limit">
+      <compilation-unit name="push-limit-to-external-scan">
+        <output-dir compare="Text">push-limit-to-external-scan</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="limit">
+      <compilation-unit name="push-limit-to-external-scan-select">
+        <output-dir compare="Text">push-limit-to-external-scan-select</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="limit">
       <compilation-unit name="push-limit-to-primary-scan">
         <output-dir compare="Text">push-limit-to-primary-scan</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
index 8fc70b8..143b805 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
@@ -20,6 +20,7 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 /**
  * A super interface implemented by a data source adapter. An adapter can be a
@@ -46,9 +47,33 @@
      *            write frame to. Adapter packs the fetched bytes (from external source),
      *            packs them into frames and forwards the frames to an upstream receiving
      *            operator using the instance of IFrameWriter.
+     * @param tupleFilter
+     *            If not {@code null} then only tuple matching this filter should be emitted
+     * @param outputLimit
+     *            Limits the number of tuples that should be emitted
+     *            (after applying the filter if it's specified).
+     *            {@code -1} if there is not limit.
      * @throws Exception
      */
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException;
+    void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException;
+
+    /**
+     * Triggers the adapter to begin ingesting data from the external source.
+     *
+     * @param partition
+     *            The adapter could be running with a degree of parallelism.
+     *            partition corresponds to the i'th parallel instance.
+     * @param writer
+     *            The instance of frame writer that is used by the adapter to
+     *            write frame to. Adapter packs the fetched bytes (from external source),
+     *            packs them into frames and forwards the frames to an upstream receiving
+     *            operator using the instance of IFrameWriter.
+     * @throws Exception
+     */
+    default void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
+        start(partition, writer, null, -1);
+    }
 
     /**
      * @return The number of processed tuples by this adapter
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index ccc420b..527e342 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -22,11 +22,13 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 @FunctionalInterface
 public interface IDataFlowController {
 
-    public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException;
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException;
 
     public default boolean pause() throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 7a089b8..efb0f13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -67,7 +68,11 @@
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException {
+        if (tupleFilter != null || outputLimit >= 0) {
+            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+        }
         synchronized (this) {
             if (state == State.STOPPED) {
                 return;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 4deb422..b42e6de 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -18,12 +18,15 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 public class FeedStreamDataFlowController extends AbstractFeedDataFlowController {
 
@@ -39,7 +42,10 @@
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
+        if (tupleFilter != null || outputLimit >= 0) {
+            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+        }
         try {
             tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 2c19f9d..ce49ccf8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -26,6 +26,9 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
 
 public class RecordDataFlowController<T> extends AbstractDataFlowController {
 
@@ -42,17 +45,29 @@
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
         try {
             processedTuples = 0;
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
+            boolean tupleFilterExists = tupleFilter != null;
+            ArrayTupleReference tupleRef = tupleFilterExists ? new ArrayTupleReference() : null;
+            ReferenceFrameTupleReference frameTupleRef = tupleFilterExists ? new ReferenceFrameTupleReference() : null;
             TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
-            while (recordReader.hasNext()) {
+            while ((outputLimit < 0 || processedTuples < outputLimit) && recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 tb.reset();
                 if (dataParser.parse(record, tb.getDataOutput())) {
                     tb.addFieldEndOffset();
                     appendOtherTupleFields(tb);
+
+                    if (tupleFilterExists) {
+                        tupleRef.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+                        frameTupleRef.reset(tupleRef);
+                        if (!tupleFilter.accept(frameTupleRef)) {
+                            continue;
+                        }
+                    }
+
                     tupleForwarder.addTuple(tb);
                     processedTuples++;
                 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 9c11c97..85320e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -23,6 +23,9 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
 
 public class StreamDataFlowController extends AbstractDataFlowController {
     private final IStreamDataParser dataParser;
@@ -33,17 +36,29 @@
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
         try {
             processedTuples = 0;
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+            boolean tupleFilterExists = tupleFilter != null;
+            ArrayTupleReference tupleRef = tupleFilterExists ? new ArrayTupleReference() : null;
+            ReferenceFrameTupleReference frameTupleRef = tupleFilterExists ? new ReferenceFrameTupleReference() : null;
             TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
-            while (true) {
+            while (outputLimit < 0 || processedTuples < outputLimit) {
                 tb.reset();
                 if (!dataParser.parse(tb.getDataOutput())) {
                     break;
                 }
                 tb.addFieldEndOffset();
+
+                if (tupleFilterExists) {
+                    tupleRef.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+                    frameTupleRef.reset(tupleRef);
+                    if (!tupleFilter.accept(frameTupleRef)) {
+                        continue;
+                    }
+                }
+
                 tupleForwarder.addTuple(tb);
                 processedTuples++;
             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 123a552..54e633a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 public class FeedAdapter implements IDataSourceAdapter, Closeable {
     private final AbstractFeedDataFlowController controller;
@@ -34,8 +35,9 @@
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
-        controller.start(writer);
+    public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException {
+        controller.start(writer, tupleFilter, outputLimit);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 17a134b..9e75d69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -22,6 +22,7 @@
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 public class GenericAdapter implements IDataSourceAdapter {
 
@@ -32,8 +33,9 @@
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
-        controller.start(writer);
+    public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException {
+        controller.start(writer, tupleFilter, outputLimit);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 1d7623d..82b8113 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -29,6 +29,8 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 /*
  * A single activity operator that provides the functionality of scanning data using an
@@ -36,21 +38,31 @@
  */
 public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
-    private ITypedAdapterFactory adapterFactory;
+    private final ITypedAdapterFactory adapterFactory;
+
+    private final ITupleFilterFactory tupleFilterFactory;
+
+    private final long outputLimit;
 
     public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
-            ITypedAdapterFactory dataSourceAdapterFactory) {
+            ITypedAdapterFactory dataSourceAdapterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit) {
         super(spec, 0, 1);
         outRecDescs[0] = rDesc;
         this.adapterFactory = dataSourceAdapterFactory;
+        this.tupleFilterFactory = tupleFilterFactory;
+        this.outputLimit = outputLimit;
+    }
+
+    public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
+            ITypedAdapterFactory dataSourceAdapterFactory) {
+        this(spec, rDesc, dataSourceAdapterFactory, null, -1);
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
@@ -64,8 +76,10 @@
                 }
                 try {
                     writer.open();
+                    ITupleFilter tupleFilter =
+                            tupleFilterFactory != null ? tupleFilterFactory.createTupleFilter(ctx) : null;
                     adapter = adapterFactory.createAdapter(ctx, partition);
-                    adapter.start(partition, writer);
+                    adapter.start(partition, writer, tupleFilter, outputLimit);
                     if (stats != null) {
                         stats.getTupleCounter().update(adapter.getProcessedTuples());
                     }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index effd59f..314cd20 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
@@ -34,6 +36,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.file.ITupleParser;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -64,7 +67,12 @@
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException {
+    public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException {
+        if (tupleFilter != null || outputLimit >= 0) {
+            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+        }
+
         generator = new DummyGenerator(configuration, pos);
         ExecutorService executor = Executors.newSingleThreadExecutor();
         executor.execute(generator);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
index 708cdd8..14e70a9 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
     TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
@@ -33,7 +34,7 @@
     }
 
     @Override
-    public void start(IFrameWriter writer) {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) {
         throw new UnsupportedOperationException();
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 2ac8bf9..29a66b9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -23,8 +23,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
@@ -105,10 +103,6 @@
             IProjectionInfo<?> projectionInfo) throws AlgebricksException {
         switch (dataset.getDatasetType()) {
             case EXTERNAL:
-                if (tupleFilterFactory != null || outputLimit >= 0) {
-                    throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
-                            "Tuple filter and limit are not supported by ExternalDataSource");
-                }
                 Dataset externalDataset = ((DatasetDataSource) dataSource).getDataset();
                 String itemTypeName = externalDataset.getItemTypeName();
                 IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
@@ -118,7 +112,8 @@
                 Map<String, String> properties = addProjectionInfo(projectionInfo, edd.getProperties());
                 ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
                         edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector());
-                return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+                return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+                        tupleFilterFactory, outputLimit);
             case INTERNAL:
                 DataSourceId id = getId();
                 DataverseName dataverseName = id.getDataverseName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index d415f73..f5fd7dd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -25,8 +25,6 @@
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
 import org.apache.asterix.metadata.api.IDatasourceFunction;
@@ -86,17 +84,14 @@
             ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
             IProjectionInfo<?> projectionInfo) throws AlgebricksException {
-        if (tupleFilterFactory != null || outputLimit >= 0) {
-            throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
-                    "tuple filter and limit are not supported by FunctionDataSource");
-        }
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
         adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
         FunctionDataSourceFactory factory =
                 new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
         adapterFactory.configure(factory);
-        return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+        return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+                tupleFilterFactory, outputLimit);
     }
 
     protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index dd8b3b4..e5fd6f7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -995,7 +995,8 @@
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
+            JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory,
+            ITupleFilterFactory tupleFilterFactory, long outputLimit) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
             throw new AlgebricksException("Can only scan datasets of records.");
         }
@@ -1004,8 +1005,8 @@
                 getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
-        ExternalScanOperatorDescriptor dataScanner =
-                new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory, tupleFilterFactory, outputLimit);
 
         AlgebricksPartitionConstraint constraint;
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index fae0d75..a5bc206 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -22,7 +22,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -37,7 +36,6 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -48,6 +46,7 @@
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -233,8 +232,11 @@
             cursor.next();
             matchingTupleCount++;
             ITupleReference tuple = cursor.getTuple();
-            if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
-                continue;
+            if (tupleFilter != null) {
+                referenceFilterTuple.reset(tuple);
+                if (!tupleFilter.accept(referenceFilterTuple)) {
+                    continue;
+                }
             }
             tb.reset();
 
@@ -388,52 +390,6 @@
         }
     }
 
-    /**
-     * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
-     * is used by ITupleFilter
-     *
-     */
-    protected static class ReferenceFrameTupleReference implements IFrameTupleReference {
-        private ITupleReference tuple;
-
-        public IFrameTupleReference reset(ITupleReference tuple) {
-            this.tuple = tuple;
-            return this;
-        }
-
-        @Override
-        public int getFieldCount() {
-            return tuple.getFieldCount();
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-            return tuple.getFieldData(fIdx);
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            return tuple.getFieldStart(fIdx);
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            return tuple.getFieldLength(fIdx);
-        }
-
-        @Override
-        public IFrameTupleAccessor getFrameTupleAccessor() {
-            throw new UnsupportedOperationException(
-                    "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
-        }
-
-        @Override
-        public int getTupleIndex() {
-            throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
-        }
-
-    }
-
     @Override
     public String getDisplayName() {
         return "Index Search";
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java
new file mode 100644
index 0000000..f7d8169
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.common.tuples;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
+ * is used by ITupleFilter
+ */
+public final class ReferenceFrameTupleReference implements IFrameTupleReference {
+
+    private ITupleReference tuple;
+
+    public void reset(ITupleReference tuple) {
+        this.tuple = tuple;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return tuple.getFieldCount();
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return tuple.getFieldData(fIdx);
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return tuple.getFieldStart(fIdx);
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return tuple.getFieldLength(fIdx);
+    }
+
+    @Override
+    public IFrameTupleAccessor getFrameTupleAccessor() {
+        throw new UnsupportedOperationException(
+                "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
+    }
+
+    @Override
+    public int getTupleIndex() {
+        throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 596f4b0..dddaab1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -94,8 +94,11 @@
             cursor.next();
             matchingTupleCount++;
             ITupleReference tuple = cursor.getTuple();
-            if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
-                continue;
+            if (tupleFilter != null) {
+                referenceFilterTuple.reset(tuple);
+                if (!tupleFilter.accept(referenceFilterTuple)) {
+                    continue;
+                }
             }
             tb.reset();