[ASTERIXDB-3144][HYR][RT] Make dump_index() function support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch changes the dump_index() function to support
operating on multiple partitions.
Change-Id: I8754069a7340c0d9e3bf69e1fe5c94eb333b73b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17513
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index 691be47..bd72a66 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -39,27 +39,30 @@
private final IndexDataflowHelperFactory indexDataflowHelperFactory;
private final RecordDescriptor recDesc;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final AlgebricksAbsolutePartitionConstraint storageLocations;
+ private final AlgebricksAbsolutePartitionConstraint constraint;
+ private final int[][] partitionsMap;
public DumpIndexDatasource(INodeDomain domain, IndexDataflowHelperFactory indexDataflowHelperFactory,
RecordDescriptor recDesc, IBinaryComparatorFactory[] comparatorFactories,
- AlgebricksAbsolutePartitionConstraint storageLocations) throws AlgebricksException {
+ AlgebricksAbsolutePartitionConstraint constraint, int[][] partitionsMap) throws AlgebricksException {
super(DUMP_INDEX_DATASOURCE_ID, DumpIndexRewriter.DUMP_INDEX, domain);
this.indexDataflowHelperFactory = indexDataflowHelperFactory;
this.recDesc = recDesc;
this.comparatorFactories = comparatorFactories;
- this.storageLocations = storageLocations;
+ this.constraint = constraint;
+ this.partitionsMap = partitionsMap;
}
@Override
protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- return storageLocations;
+ return constraint;
}
@Override
protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
AlgebricksAbsolutePartitionConstraint locations) {
- return new DumpIndexFunction(locations, indexDataflowHelperFactory, recDesc, comparatorFactories);
+ return new DumpIndexFunction(locations, indexDataflowHelperFactory, recDesc, comparatorFactories,
+ partitionsMap);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
index 2fdbef3..fcfe3e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
@@ -31,25 +31,31 @@
public class DumpIndexFunction extends AbstractDatasourceFunction {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final IndexDataflowHelperFactory indexDataflowHelperFactory;
private final RecordDescriptor recDesc;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final int[][] partitionsMap;
public DumpIndexFunction(AlgebricksAbsolutePartitionConstraint locations,
IndexDataflowHelperFactory indexDataflowHelperFactory, RecordDescriptor recDesc,
- IBinaryComparatorFactory[] comparatorFactories) {
+ IBinaryComparatorFactory[] comparatorFactories, int[][] partitionsMap) {
super(locations);
this.indexDataflowHelperFactory = indexDataflowHelperFactory;
this.recDesc = recDesc;
this.comparatorFactories = comparatorFactories;
+ this.partitionsMap = partitionsMap;
}
@Override
public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
- final IIndexDataflowHelper indexDataflowHelper = indexDataflowHelperFactory.create(serviceCtx, partition);
- return new DumpIndexReader(indexDataflowHelper, recDesc, comparatorFactories);
+ int[] partitions = partitionsMap[partition];
+ final IIndexDataflowHelper[] indexDataflowHelpers = new IIndexDataflowHelper[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ indexDataflowHelpers[i] = indexDataflowHelperFactory.create(serviceCtx, partitions[i]);
+ }
+ return new DumpIndexReader(indexDataflowHelpers, recDesc, comparatorFactories);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 8ef094e..5c5a218 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -34,12 +34,13 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -48,38 +49,50 @@
public class DumpIndexReader extends FunctionReader {
private final CharArrayRecord record;
- private final IIndexCursor searchCursor;
+ private final IIndexCursor[] searchCursors;
private final RecordDescriptor secondaryRecDesc;
private final StringBuilder recordBuilder = new StringBuilder();
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream dis = new DataInputStream(bbis);
- private final IIndexDataflowHelper indexDataflowHelper;
- private final IIndexAccessor accessor;
+ private final IIndexDataflowHelper[] indexDataflowHelpers;
+ private final IIndexAccessor[] accessors;
+ private int currentSearchIdx;
- public DumpIndexReader(IIndexDataflowHelper indexDataflowHelper, RecordDescriptor secondaryRecDesc,
+ public DumpIndexReader(IIndexDataflowHelper[] indexDataflowHelpers, RecordDescriptor secondaryRecDesc,
IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException {
- this.indexDataflowHelper = indexDataflowHelper;
+ this.indexDataflowHelpers = indexDataflowHelpers;
this.secondaryRecDesc = secondaryRecDesc;
- indexDataflowHelper.open();
- IIndex indexInstance = indexDataflowHelper.getIndexInstance();
- accessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- searchCursor = accessor.createSearchCursor(false);
MultiComparator searchMultiComparator = MultiComparator.create(comparatorFactories);
RangePredicate rangePredicate =
new RangePredicate(null, null, true, true, searchMultiComparator, searchMultiComparator, null, null);
- accessor.search(searchCursor, rangePredicate);
+ this.accessors = new IIndexAccessor[indexDataflowHelpers.length];
+ this.searchCursors = new IIndexCursor[indexDataflowHelpers.length];
+ for (int i = 0; i < indexDataflowHelpers.length; i++) {
+ IIndexDataflowHelper indexDataflowHelper = indexDataflowHelpers[i];
+ indexDataflowHelper.open();
+ accessors[i] = indexDataflowHelper.getIndexInstance().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ searchCursors[i] = accessors[i].createSearchCursor(false);
+ accessors[i].search(searchCursors[i], rangePredicate);
+ }
+ currentSearchIdx = 0;
record = new CharArrayRecord();
}
@Override
public boolean hasNext() throws Exception {
- return searchCursor.hasNext();
+ while (currentSearchIdx < searchCursors.length) {
+ if (searchCursors[currentSearchIdx].hasNext()) {
+ return true;
+ }
+ currentSearchIdx++;
+ }
+ return false;
}
@Override
public IRawRecord<char[]> next() throws IOException, InterruptedException {
- searchCursor.next();
- ITupleReference tuple = searchCursor.getTuple();
+ searchCursors[currentSearchIdx].next();
+ ITupleReference tuple = searchCursors[currentSearchIdx].getTuple();
buildJsonRecord(tuple);
record.reset();
record.append(recordBuilder.toString().toCharArray());
@@ -89,16 +102,21 @@
@Override
public void close() throws IOException {
- bbis.close();
- dis.close();
- if (searchCursor != null) {
- searchCursor.close();
- searchCursor.destroy();
+ Throwable failure = releaseResources();
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
}
- if (accessor != null) {
- accessor.destroy();
+ }
+
+ private Throwable releaseResources() {
+ Throwable failure = CleanupUtils.close(bbis, null);
+ failure = CleanupUtils.close(dis, failure);
+ for (int i = 0; i < indexDataflowHelpers.length; i++) {
+ failure = ResourceReleaseUtils.close(searchCursors[i], failure);
+ failure = CleanupUtils.destroy(failure, searchCursors[i], accessors[i]);
+ failure = ResourceReleaseUtils.close(indexDataflowHelpers[i], failure);
}
- indexDataflowHelper.close();
+ return failure;
}
private void buildJsonRecord(ITupleReference tuple) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
index 6c0382d..dac1ac7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.app.function;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.functions.FunctionConstants;
@@ -66,13 +67,15 @@
}
ISecondaryIndexOperationsHelper secondaryIndexHelper =
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, loc);
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
IndexDataflowHelperFactory indexDataflowHelperFactory =
new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
- secondaryIndexHelper.getSecondaryFileSplitProvider());
+ partitioningProperties.getSpiltsProvider());
AlgebricksAbsolutePartitionConstraint secondaryPartitionConstraint =
- (AlgebricksAbsolutePartitionConstraint) secondaryIndexHelper.getSecondaryPartitionConstraint();
+ (AlgebricksAbsolutePartitionConstraint) partitioningProperties.getConstraints();
return new DumpIndexDatasource(context.getComputationNodeDomain(), indexDataflowHelperFactory,
secondaryIndexHelper.getSecondaryRecDesc(), secondaryIndexHelper.getSecondaryComparatorFactories(),
- secondaryPartitionConstraint);
+ secondaryPartitionConstraint, partitioningProperties.getComputeStorageMap());
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 1b4fd23..64dce1e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -63,7 +63,7 @@
IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException {
super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree);
-
+ //TODO(partitioning) correlated
this.primaryIndexHelper =
primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.secondaryIndexHelper =