[ASTERIXDB-3144][RT] Make index modification runtime support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch changes the index modification runtime to support
operating on multiple partitions. With this change, an index
modification node pushable will write to multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.
Change-Id: I08da28f2a26fcaf581c2256312455fe541fae5ea
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17452
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
index d8d155e..ba932e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -94,5 +94,9 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index b6192c1..4cd8a11 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public interface IFrameTupleProcessor {
@@ -37,7 +38,7 @@
* the index of the tuple in the frame
* @throws HyracksDataException
*/
- void process(ITupleReference tuple, int index) throws HyracksDataException;
+ void process(FrameTupleAccessor accessor, ITupleReference tuple, int index) throws HyracksDataException;
/**
* Called once per batch before ending the batch process
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9e8c568..214d9dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
+import java.util.Set;
import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,7 +40,6 @@
* @param tuple
* the operation tuple
* @throws HyracksDataException
- * @throws IndexException
*/
void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
@@ -54,7 +54,6 @@
* the operation tuple
* @return
* @throws HyracksDataException
- * @throws IndexException
*/
boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException;
@@ -69,7 +68,6 @@
* @param pred
* the search predicate
* @throws HyracksDataException
- * @throws IndexException
*/
void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
@@ -104,9 +102,7 @@
* Schedule a merge
*
* @param ctx
- * @param callback
* @throws HyracksDataException
- * @throws IndexException
*/
ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -114,9 +110,7 @@
* Schedule full merge
*
* @param ctx
- * @param callback
* @throws HyracksDataException
- * @throws IndexException
*/
ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -125,7 +119,6 @@
*
* @param operation
* @throws HyracksDataException
- * @throws IndexException
*/
void merge(ILSMIOOperation operation) throws HyracksDataException;
@@ -133,7 +126,6 @@
* Schedule a flush
*
* @param ctx
- * @param callback
* @throws HyracksDataException
*/
ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -143,7 +135,6 @@
*
* @param operation
* @throws HyracksDataException
- * @throws IndexException
*/
void flush(ILSMIOOperation operation) throws HyracksDataException;
@@ -153,7 +144,6 @@
* @param ioOperation
* the io operation that added the new component
* @throws HyracksDataException
- * @throws IndexException
*/
void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
@@ -235,10 +225,13 @@
* the tuple processor
* @param frameOpCallback
* the callback at the end of the frame
+ * @param tuples
+ * the indexes of tuples to process
* @throws HyracksDataException
*/
void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+ throws HyracksDataException;
/**
* Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 02744ee..c64e1e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -47,15 +48,15 @@
public LSMIndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
IndexOperation op, IModificationOperationCallbackFactory modCallbackFactory,
- ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+ ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+ int[][] partitionsMap) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory,
- tupleFilterFactory);
+ tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
- ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
int nextFlushTupleIndex = 0;
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
@@ -68,6 +69,9 @@
}
tuple.reset(accessor, i);
+ int storagePartition = tuplePartitioner.partition(accessor, i);
+ int storageIdx = storagePartitionId2Index.get(storagePartition);
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
switch (op) {
case INSERT: {
if (!lsmAccessor.tryInsert(tuple)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index fb884f7..26ce56a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -33,24 +34,29 @@
public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
protected final int[] fieldPermutation;
protected final IndexOperation op;
protected final IIndexDataflowHelperFactory indexHelperFactory;
protected final IModificationOperationCallbackFactory modCallbackFactory;
protected final ITupleFilterFactory tupleFilterFactory;
+ protected final ITuplePartitionerFactory tuplePartitionerFactory;
+ protected final int[][] partitionsMap;
public LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation,
IndexOperation op, IModificationOperationCallbackFactory modCallbackFactory,
- ITupleFilterFactory tupleFilterFactory) {
+ ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+ int[][] partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.modCallbackFactory = modCallbackFactory;
this.tupleFilterFactory = tupleFilterFactory;
this.fieldPermutation = fieldPermutation;
this.op = op;
+ this.tuplePartitionerFactory = tuplePartitionerFactory;
+ this.partitionsMap = partitionsMap;
this.outRecDescs[0] = outRecDesc;
}
@@ -59,6 +65,6 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new LSMIndexInsertUpdateDeleteOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), op, modCallbackFactory,
- tupleFilterFactory);
+ tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 950a8e5..e2c9fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
@@ -711,12 +712,13 @@
@Override
public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+ throws HyracksDataException {
processor.start();
enter(ctx);
try {
try {
- processFrame(accessor, tuple, processor);
+ processFrame(accessor, tuple, processor, tuples);
frameOpCallback.frameCompleted();
} catch (Throwable th) {
processor.fail(th);
@@ -860,13 +862,14 @@
}
private static void processFrame(FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor) throws HyracksDataException {
+ IFrameTupleProcessor processor, Set<Integer> tuples) throws HyracksDataException {
int tupleCount = accessor.getTupleCount();
- int i = 0;
- while (i < tupleCount) {
+ for (int i = 0; i < tupleCount; i++) {
+ if (!tuples.contains(i)) {
+ continue;
+ }
tuple.reset(accessor, i);
- processor.process(tuple, i);
- i++;
+ processor.process(accessor, tuple, i);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 8412b8c..fb5984d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import java.util.Set;
import java.util.function.Predicate;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -210,8 +211,8 @@
}
public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
- IFrameOperationCallback frameOpCallback) throws HyracksDataException {
- lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+ IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
+ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
}
@Override