[ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Add internal query-partition() to get all tuples in a partition.
Ext-ref: MB-62720
Change-Id: I37185d159a38d26c8cc93ddd6500e437891c44f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18483
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 52cd6cf..f3881e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -110,6 +110,10 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
new file mode 100644
index 0000000..dfa59b7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final int targetStoragePartition;
+
+ public BTreePartitionSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
+ IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
+ boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
+ byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, int targetStoragePartition) {
+ super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
+ appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ searchCallbackProceedResultTrueValue, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
+ this.targetStoragePartition = targetStoragePartition;
+ }
+
+ @Override
+ public BTreeSearchOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new BTreePartitionSearchOperatorNodePushable(ctx, partition,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
+ nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
+ searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+ tuplePartitionerFactory, partitionsMap, targetStoragePartition);
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "BTree Partition Search";
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
new file mode 100644
index 0000000..7518791
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
+
+ private final int pIdx;
+
+ public BTreePartitionSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+ RecordDescriptor inputRecDesc, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
+ IMissingWriterFactory nonMatchWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
+ boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean appendOpCallbackProceedResult,
+ byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue,
+ ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+ int[][] partitionsMap, int targetStoragePartition) throws HyracksDataException {
+ super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+ minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
+ nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory,
+ tupleFilterFactory, outputLimit, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ searchCallbackProceedResultTrueValue, projectorFactory, tuplePartitionerFactory, partitionsMap);
+ pIdx = storagePartitionId2Index.getOrDefault(targetStoragePartition, Integer.MIN_VALUE);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ searchPartition(tupleCount);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void searchPartition(int tupleCount) throws Exception {
+ if (pIdx >= 0 && pIdx < cursors.length) {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ resetSearchPredicate(i);
+ cursors[pIdx].close();
+ indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+ writeSearchResults(i, cursors[pIdx]);
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 91b87c6..da5df23 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -120,7 +120,7 @@
protected final ITupleProjector tupleProjector;
protected final ITuplePartitioner tuplePartitioner;
protected final int[] partitions;
- private final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
+ protected final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,