[ASTERIXDB-3144][HYR] Make index search runtime support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch changes the index search runtime to support
operating on multiple partitions. With this change, an index
search node pushable will read from multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.
Change-Id: Iea8418bdfbca2db9cc5f0aa23c2434f3779e8531
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17444
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
index 9d869e9..9bdbf3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
@@ -113,5 +113,9 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 4fc8057..c6cdd66 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -27,6 +27,8 @@
import org.apache.hyracks.api.dataflow.IIntrospectingOperator;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
@@ -46,7 +48,6 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
-import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
@@ -64,22 +65,24 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable
implements IIntrospectingOperator {
static final Logger LOGGER = LogManager.getLogger();
protected final IHyracksTaskContext ctx;
- protected final IIndexDataflowHelper indexHelper;
protected FrameTupleAccessor accessor;
-
protected FrameTupleAppender appender;
protected ArrayTupleBuilder tb;
protected DataOutput dos;
- protected IIndex index;
protected ISearchPredicate searchPred;
- protected IIndexCursor cursor;
- protected IIndexAccessor indexAccessor;
+ protected final IIndexDataflowHelper[] indexHelpers;
+ protected IIndex[] indexes;
+ protected IIndexAccessor[] indexAccessors;
+ protected IIndexCursor[] cursors;
protected final RecordDescriptor inputRecDesc;
protected final boolean retainInput;
@@ -114,28 +117,31 @@
protected long outputCount = 0;
protected boolean finished;
protected final ITupleProjector tupleProjector;
-
- // no filter and limit pushdown
- public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
- int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
- boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory,
- ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- IMissingWriterFactory nonFilterWriterFactory) throws HyracksDataException {
- this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
- retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
- nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE);
- }
+ protected final ITuplePartitioner tuplePartitioner;
+ protected final int[] partitions;
+ private final Int2IntMap storagePartitionId2Index = new Int2IntOpenHashMap();
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFactoryFactory, long outputLimit,
+ IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
- throws HyracksDataException {
+ byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
this.ctx = ctx;
- this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+ this.partitions = map != null ? map[partition] : new int[] { partition };
+ for (int i = 0; i < partitions.length; i++) {
+ storagePartitionId2Index.put(partitions[i], i);
+ }
+ this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+ this.indexes = new IIndex[partitions.length];
+ this.indexAccessors = new IIndexAccessor[partitions.length];
+ this.cursors = new IIndexCursor[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+ }
this.retainInput = retainInput;
this.retainMissing = retainMissing;
this.appendIndexFilter = appendIndexFilter;
@@ -160,7 +166,7 @@
this.appendSearchCallbackProceedResult = appendSearchCallbackProceedResult;
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
- this.tupleFilterFactory = tupleFactoryFactory;
+ this.tupleFilterFactory = tupleFilterFactory;
this.outputLimit = outputLimit;
this.stats = new NoOpOperatorStats();
@@ -169,30 +175,43 @@
}
tupleProjector = projectorFactory.createTupleProjector(ctx);
+ tuplePartitioner = tuplePartitionerFactory == null ? null : tuplePartitionerFactory.createPartitioner(ctx);
}
- protected abstract ISearchPredicate createSearchPredicate();
+ protected abstract ISearchPredicate createSearchPredicate(IIndex index);
protected abstract void resetSearchPredicate(int tupleIndex);
// Assigns any index-type specific related accessor parameters
protected abstract void addAdditionalIndexAccessorParams(IIndexAccessParameters iap) throws HyracksDataException;
- protected IIndexCursor createCursor() throws HyracksDataException {
- return indexAccessor.createSearchCursor(false);
+ protected IIndexCursor createCursor(IIndex idx, IIndexAccessor idxAccessor) throws HyracksDataException {
+ return idxAccessor.createSearchCursor(false);
}
- protected abstract int getFieldCount();
+ protected abstract int getFieldCount(IIndex index);
@Override
public void open() throws HyracksDataException {
writer.open();
- indexHelper.open();
- index = indexHelper.getIndexInstance();
- subscribeForStats(index);
+ ISearchOperationCallback[] searchCallbacks = new ISearchOperationCallback[partitions.length];
+ IIndexAccessParameters[] iaps = new IndexAccessParameters[partitions.length];
+
+ for (int i = 0; i < partitions.length; i++) {
+ indexHelpers[i].open();
+ indexes[i] = indexHelpers[i].getIndexInstance();
+ searchCallbacks[i] = searchCallbackFactory
+ .createSearchOperationCallback(indexHelpers[i].getResource().getId(), ctx, null);
+ iaps[i] = new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
+ addAdditionalIndexAccessorParams(iaps[i]);
+ indexAccessors[i] = indexes[i].createAccessor(iaps[i]);
+ cursors[i] = createCursor(indexes[i], indexAccessors[i]);
+ }
+
+ subscribeForStats(indexes[0]);
accessor = new FrameTupleAccessor(inputRecDesc);
if (retainMissing) {
- int fieldCount = getFieldCount();
+ int fieldCount = getFieldCount(indexes[0]);
// Field count in case searchCallback.proceed() result is needed.
int finalFieldCount = appendSearchCallbackProceedResult ? fieldCount + 1 : fieldCount;
nonMatchTupleBuild = new ArrayTupleBuilder(finalFieldCount);
@@ -206,7 +225,7 @@
nonMatchTupleBuild = null;
}
if (appendIndexFilter) {
- int numIndexFilterFields = index.getNumOfFilterFields();
+ int numIndexFilterFields = indexes[0].getNumOfFilterFields();
nonFilterTupleBuild = new ArrayTupleBuilder(numIndexFilterFields);
buildMissingTuple(numIndexFilterFields, nonFilterTupleBuild, nonFilterWriter);
}
@@ -219,16 +238,9 @@
outputCount = 0;
try {
- searchPred = createSearchPredicate();
+ searchPred = createSearchPredicate(indexes[0]);
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- ISearchOperationCallback searchCallback =
- searchCallbackFactory.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null);
- IIndexAccessParameters iap = new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallback);
- addAdditionalIndexAccessorParams(iap);
- indexAccessor = index.createAccessor(iap);
- cursor = createCursor();
if (retainInput) {
frameTuple = new FrameTupleReference();
}
@@ -237,7 +249,7 @@
}
}
- protected void writeSearchResults(int tupleIndex) throws Exception {
+ protected void writeSearchResults(int tupleIndex, IIndexCursor cursor) throws Exception {
long matchingTupleCount = 0;
while (cursor.hasNext()) {
cursor.next();
@@ -291,11 +303,10 @@
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
try {
- for (int i = 0; i < tupleCount && !finished; i++) {
- resetSearchPredicate(i);
- cursor.close();
- indexAccessor.search(cursor, searchPred);
- writeSearchResults(i);
+ if (tuplePartitioner != null) {
+ searchPartition(tupleCount);
+ } else {
+ searchAllPartitions(tupleCount);
}
} catch (Exception e) {
throw HyracksDataException.create(e);
@@ -309,40 +320,52 @@
@Override
public void close() throws HyracksDataException {
- Throwable failure = releaseResources();
+ Throwable failure = flushFrame();
+ failure = releaseResources(failure);
failure = CleanupUtils.close(writer, failure);
if (failure != null) {
throw HyracksDataException.create(failure);
}
}
- private Throwable releaseResources() {
+ private Throwable flushFrame() {
Throwable failure = null;
- if (index != null) {
- // if index == null, then the index open was not successful
- if (!failed) {
- try {
- if (appender.getTupleCount() > 0) {
- appender.write(writer, true);
- }
- stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
- stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
- } catch (Throwable th) { // NOSONAR Must ensure writer.fail is called.
- // subsequently, the failure will be thrown
- failure = th;
+ if (!failed) {
+ try {
+ if (appender.getTupleCount() > 0) {
+ appender.write(writer, true);
}
- if (failure != null) {
- try {
- writer.fail();
- } catch (Throwable th) {// NOSONAR Must cursor.close is called.
- // subsequently, the failure will be thrown
- failure = ExceptionUtils.suppress(failure, th);
- }
+ stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
+ stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
+ } catch (Throwable th) { // NOSONAR Must ensure writer.fail is called.
+ // subsequently, the failure will be thrown
+ failure = th;
+ }
+ if (failure != null) {
+ try {
+ writer.fail();
+ } catch (Throwable th) {
+ // subsequently, the failure will be thrown
+ failure = ExceptionUtils.suppress(failure, th);
}
}
- failure = ResourceReleaseUtils.close(cursor, failure);
- failure = CleanupUtils.destroy(failure, cursor, indexAccessor);
- failure = ResourceReleaseUtils.close(indexHelper, failure);
+ }
+ return failure;
+ }
+
+ private Throwable releaseResources(Throwable failure) {
+ for (int i = 0; i < indexes.length; i++) {
+ // if index == null, then the index open was not successful
+ try {
+ if (indexes[i] != null) {
+ failure = ResourceReleaseUtils.close(cursors[i], failure);
+ failure = CleanupUtils.destroy(failure, cursors[i], indexAccessors[i]);
+ failure = ResourceReleaseUtils.close(indexHelpers[i], failure);
+ }
+ } catch (Throwable th) {// NOSONAR ensure closing other indexes
+ // subsequently, the failure will be thrown
+ failure = ExceptionUtils.suppress(failure, th);
+ }
}
return failure;
}
@@ -413,4 +436,25 @@
this.stats = stats;
}
+ private void searchPartition(int tupleCount) throws Exception {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ int storagePartition = tuplePartitioner.partition(accessor, i);
+ int pIdx = storagePartitionId2Index.get(storagePartition);
+ resetSearchPredicate(i);
+ cursors[pIdx].close();
+ indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+ writeSearchResults(i, cursors[pIdx]);
+ }
+ }
+
+ private void searchAllPartitions(int tupleCount) throws Exception {
+ for (int p = 0; p < partitions.length; p++) {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ resetSearchPredicate(i);
+ cursors[p].close();
+ indexAccessors[p].search(cursors[p], searchPred);
+ writeSearchResults(i, cursors[p]);
+ }
+ }
+ }
}