[ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Add internal query-partition() to get all tuples in a partition.

Ext-ref: MB-62720
Change-Id: I37185d159a38d26c8cc93ddd6500e437891c44f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18483
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 52cd6cf..f3881e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -110,6 +110,10 @@
       <artifactId>jackson-databind</artifactId>
     </dependency>
     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
new file mode 100644
index 0000000..dfa59b7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final int targetStoragePartition;
+
+    public BTreePartitionSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+            IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+            IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
+            IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
+            boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
+            byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, int targetStoragePartition) {
+        super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
+                retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
+                maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
+                appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+                searchCallbackProceedResultTrueValue, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
+        this.targetStoragePartition = targetStoragePartition;
+    }
+
+    @Override
+    public BTreeSearchOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new BTreePartitionSearchOperatorNodePushable(ctx, partition,
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
+                lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
+                retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
+                nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
+                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+                tuplePartitionerFactory, partitionsMap, targetStoragePartition);
+    }
+
+    @Override
+    public String getDisplayName() {
+        return "BTree Partition Search";
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
new file mode 100644
index 0000000..7518791
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
+
+    private final int pIdx;
+
+    public BTreePartitionSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+            RecordDescriptor inputRecDesc, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+            boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
+            IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+            IMissingWriterFactory nonMatchWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+            boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
+            ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean appendOpCallbackProceedResult,
+            byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue,
+            ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap, int targetStoragePartition) throws HyracksDataException {
+        super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+                minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
+                nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory,
+                tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+                searchCallbackProceedResultTrueValue, projectorFactory, tuplePartitionerFactory, partitionsMap);
+        pIdx = storagePartitionId2Index.getOrDefault(targetStoragePartition, Integer.MIN_VALUE);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            searchPartition(tupleCount);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void searchPartition(int tupleCount) throws Exception {
+        if (pIdx >= 0 && pIdx < cursors.length) {
+            for (int i = 0; i < tupleCount && !finished; i++) {
+                resetSearchPredicate(i);
+                cursors[pIdx].close();
+                indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+                writeSearchResults(i, cursors[pIdx]);
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 91b87c6..da5df23 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -120,7 +120,7 @@
     protected final ITupleProjector tupleProjector;
     protected final ITuplePartitioner tuplePartitioner;
     protected final int[] partitions;
-    private final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
+    protected final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,