[ASTERIXDB-3144][HYR][RT] Make index creation/drop support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch changes the index creation/drop operators to support
operating on multiple partitions. With this change, an index
creation/drop node pushable will create/drop multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.
Change-Id: I84375082cb772dc52ee7f51095d7b2323ab658b7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17492
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 06380fe..3d3d1f2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -233,4 +233,12 @@
Assert.assertTrue(nodeFileSplit.isPresent());
return nodeFileSplit.get().getPath();
}
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index 3624a33..add708e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.common.TestDataUtil;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.DataverseName;
@@ -174,8 +175,8 @@
// open the index to make it in-use
dataflowHelper.open();
// try to drop in-use index (should fail)
- IndexDropOperatorNodePushable dropInUseOp =
- new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+ IndexDropOperatorNodePushable dropInUseOp = new IndexDropOperatorNodePushable(helperFactory,
+ EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
try {
dropInUseOp.initialize();
} catch (HyracksDataException e) {
@@ -191,7 +192,7 @@
dropFailed.set(false);
// drop with option wait for in-use should be successful once the index is closed
final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
- EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0);
+ EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestDataUtil.getPartitionsMap(1));
Thread dropThread = new Thread(() -> {
try {
dropWithWaitOp.initialize();
@@ -214,8 +215,8 @@
private void dropNonExisting(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory) throws Exception {
dropFailed.set(false);
// Dropping non-existing index
- IndexDropOperatorNodePushable dropNonExistingOp =
- new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+ IndexDropOperatorNodePushable dropNonExistingOp = new IndexDropOperatorNodePushable(helperFactory,
+ EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
try {
dropNonExistingOp.initialize();
} catch (HyracksDataException e) {
@@ -230,8 +231,8 @@
throws Exception {
// Dropping non-existing index with if exists option should be successful
dropFailed.set(false);
- IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp =
- new IndexDropOperatorNodePushable(helperFactory, EnumSet.of(DropOption.IF_EXISTS), ctx, 0);
+ IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = new IndexDropOperatorNodePushable(helperFactory,
+ EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestDataUtil.getPartitionsMap(1));
try {
dropNonExistingWithIfExistsOp.initialize();
} catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 304d902..6daceb7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -313,8 +313,9 @@
metadataProvider.getSplitProviderAndConstraints(dataset);
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
IndexDropOperatorDescriptor primaryBtreeDrop =
- new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options);
+ new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options, partitionsMap);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
specPrimary.addRoot(primaryBtreeDrop);
@@ -350,7 +351,9 @@
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
splitsAndConstraint.first, resourceFactory, true);
- IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+ IndexCreateOperatorDescriptor indexCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
spec.addRoot(indexCreateOp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 7b4587b..161e86e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -149,7 +149,9 @@
mergePolicyFactory, mergePolicyProperties);
IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
fileSplitProvider, resourceFactory, true);
- IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+ IndexCreateOperatorDescriptor indexCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
indexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint);
spec.addRoot(indexCreateOp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index e8825fc..14c9dda 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -60,8 +60,9 @@
mergePolicyFactory, mergePolicyProperties);
IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
secondaryFileSplitProvider, resourceFactory, true);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
IndexCreateOperatorDescriptor secondaryIndexCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
secondaryIndexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
@@ -83,8 +84,9 @@
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
// The index drop operation should be persistent regardless of temp datasets or permanent dataset.
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
IndexDropOperatorDescriptor btreeDrop =
- new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, dropOptions);
+ new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, dropOptions, partitionsMap);
btreeDrop.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
splitsAndConstraint.second);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 80ce5c4..86525f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -119,7 +119,8 @@
IResourceFactory primaryResourceFactory = createPrimaryResourceFactory();
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
- IndexCreateOperatorDescriptor primaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ IndexCreateOperatorDescriptor primaryCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -162,7 +163,8 @@
IResourceFactory secondaryResourceFactory = createSecondaryResourceFactory();
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
- IndexCreateOperatorDescriptor secondaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ IndexCreateOperatorDescriptor secondaryCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
@@ -269,7 +271,8 @@
protected void destroyPrimaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
- IndexDropOperatorDescriptor primaryDropOp = new IndexDropOperatorDescriptor(spec, primaryHelperFactory);
+ IndexDropOperatorDescriptor primaryDropOp =
+ new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
spec.addRoot(primaryDropOp);
runTest(spec);
@@ -277,7 +280,8 @@
protected void destroySecondaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
- IndexDropOperatorDescriptor secondaryDropOp = new IndexDropOperatorDescriptor(spec, secondaryHelperFactory);
+ IndexDropOperatorDescriptor secondaryDropOp =
+ new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
spec.addRoot(secondaryDropOp);
runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 71ee656..3b16bb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -237,7 +237,8 @@
pageManagerFactory, null, null);
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
- IndexCreateOperatorDescriptor primaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ IndexCreateOperatorDescriptor primaryCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -295,7 +296,8 @@
JobSpecification spec = new JobSpecification();
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
- IndexCreateOperatorDescriptor secondaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+ IndexCreateOperatorDescriptor secondaryCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
@@ -409,7 +411,8 @@
protected void destroyPrimaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
- IndexDropOperatorDescriptor primaryDropOp = new IndexDropOperatorDescriptor(spec, primaryHelperFactory);
+ IndexDropOperatorDescriptor primaryDropOp =
+ new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
spec.addRoot(primaryDropOp);
runTest(spec);
@@ -417,7 +420,8 @@
protected void destroySecondaryIndex() throws Exception {
JobSpecification spec = new JobSpecification();
- IndexDropOperatorDescriptor secondaryDropOp = new IndexDropOperatorDescriptor(spec, secondaryHelperFactory);
+ IndexDropOperatorDescriptor secondaryDropOp =
+ new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
spec.addRoot(secondaryDropOp);
runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index 6f59ed8..ee56375 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -236,4 +236,12 @@
outputFiles.add(fileRef.getFile());
return new ManagedFileSplit(ncs.getId(), fileName);
}
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
index 898321b..61b600a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
@@ -25,21 +25,30 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
public class IndexCreateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final IIndexBuilderFactory indexBuilderFactory;
+ private final int[][] partitionsMap;
- public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory) {
+ public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory,
+ int[][] partitionsMap) {
super(spec, 0, 0);
this.indexBuilderFactory = indexBuilderFactory;
+ this.partitionsMap = partitionsMap;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new IndexCreateOperatorNodePushable(indexBuilderFactory.create(ctx, partition));
+ int[] storagePartitions = partitionsMap[partition];
+ IIndexBuilder[] indexBuilders = new IIndexBuilder[storagePartitions.length];
+ for (int i = 0; i < storagePartitions.length; i++) {
+ indexBuilders[i] = indexBuilderFactory.create(ctx, storagePartitions[i]);
+ }
+ return new IndexCreateOperatorNodePushable(indexBuilders);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java
index 1eb6153..9f104fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java
@@ -26,10 +26,10 @@
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
public class IndexCreateOperatorNodePushable extends AbstractOperatorNodePushable {
- private final IIndexBuilder indexBuilder;
+ private final IIndexBuilder[] indexBuilders;
- public IndexCreateOperatorNodePushable(IIndexBuilder indexBuilder) throws HyracksDataException {
- this.indexBuilder = indexBuilder;
+ public IndexCreateOperatorNodePushable(IIndexBuilder[] indexBuilders) {
+ this.indexBuilders = indexBuilders;
}
@Override
@@ -49,7 +49,9 @@
@Override
public void initialize() throws HyracksDataException {
- indexBuilder.build();
+ for (IIndexBuilder indexBuilder : indexBuilders) {
+ indexBuilder.build();
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index 032e758..32019f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -36,25 +36,27 @@
WAIT_ON_IN_USE
}
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final IIndexDataflowHelperFactory dataflowHelperFactory;
private final Set<DropOption> options;
+ private final int[][] partitionsMap;
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IIndexDataflowHelperFactory dataflowHelperFactory) {
- this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class));
+ IIndexDataflowHelperFactory dataflowHelperFactory, int[][] partitionsMap) {
+ this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class), partitionsMap);
}
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options, int[][] partitionsMap) {
super(spec, 0, 0);
this.dataflowHelperFactory = dataflowHelperFactory;
this.options = options;
+ this.partitionsMap = partitionsMap;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition);
+ return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition, partitionsMap);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index 3b6669e..81d280e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -42,13 +42,18 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long DROP_ATTEMPT_WAIT_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1);
- private final IIndexDataflowHelper indexHelper;
+ private final IIndexDataflowHelper[] indexHelpers;
private final Set<DropOption> options;
private long maxWaitTimeMillis = TimeUnit.SECONDS.toMillis(30);
public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, Set<DropOption> options,
- IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ IHyracksTaskContext ctx, int partition, int[][] partitionsMap) throws HyracksDataException {
+ int[] storagePartitions = partitionsMap[partition];
+ this.indexHelpers = new IIndexDataflowHelper[storagePartitions.length];
+ for (int i = 0; i < storagePartitions.length; i++) {
+ this.indexHelpers[i] =
+ indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), storagePartitions[i]);
+ }
this.options = options;
}
@@ -69,7 +74,9 @@
@Override
public void initialize() throws HyracksDataException {
- dropIndex();
+ for (IIndexDataflowHelper indexHelper : indexHelpers) {
+ dropIndex(indexHelper);
+ }
}
@Override
@@ -77,7 +84,7 @@
// no op
}
- private void dropIndex() throws HyracksDataException {
+ private void dropIndex(IIndexDataflowHelper indexHelper) throws HyracksDataException {
while (true) {
try {
indexHelper.destroy();