[ASTERIXDB-3144][HYR][RT] Make index creation/drop support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch changes the index creation/drop operators to support
operating on multiple partitions. With this change, an index
creation/drop node pushable will create/drop multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.

Change-Id: I84375082cb772dc52ee7f51095d7b2323ab658b7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17492
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 06380fe..3d3d1f2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -233,4 +233,12 @@
         Assert.assertTrue(nodeFileSplit.isPresent());
         return nodeFileSplit.get().getPath();
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index 3624a33..add708e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.common.TestDataUtil;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.DataverseName;
@@ -174,8 +175,8 @@
         // open the index to make it in-use
         dataflowHelper.open();
         // try to drop in-use index (should fail)
-        IndexDropOperatorNodePushable dropInUseOp =
-                new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+        IndexDropOperatorNodePushable dropInUseOp = new IndexDropOperatorNodePushable(helperFactory,
+                EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
         try {
             dropInUseOp.initialize();
         } catch (HyracksDataException e) {
@@ -191,7 +192,7 @@
         dropFailed.set(false);
         // drop with option wait for in-use should be successful once the index is closed
         final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0);
+                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestDataUtil.getPartitionsMap(1));
         Thread dropThread = new Thread(() -> {
             try {
                 dropWithWaitOp.initialize();
@@ -214,8 +215,8 @@
     private void dropNonExisting(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory) throws Exception {
         dropFailed.set(false);
         // Dropping non-existing index
-        IndexDropOperatorNodePushable dropNonExistingOp =
-                new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0);
+        IndexDropOperatorNodePushable dropNonExistingOp = new IndexDropOperatorNodePushable(helperFactory,
+                EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
         try {
             dropNonExistingOp.initialize();
         } catch (HyracksDataException e) {
@@ -230,8 +231,8 @@
             throws Exception {
         // Dropping non-existing index with if exists option should be successful
         dropFailed.set(false);
-        IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp =
-                new IndexDropOperatorNodePushable(helperFactory, EnumSet.of(DropOption.IF_EXISTS), ctx, 0);
+        IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = new IndexDropOperatorNodePushable(helperFactory,
+                EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestDataUtil.getPartitionsMap(1));
         try {
             dropNonExistingWithIfExistsOp.initialize();
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 304d902..6daceb7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -313,8 +313,9 @@
                 metadataProvider.getSplitProviderAndConstraints(dataset);
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
         IndexDropOperatorDescriptor primaryBtreeDrop =
-                new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options);
+                new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options, partitionsMap);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
                 splitsAndConstraint.second);
         specPrimary.addRoot(primaryBtreeDrop);
@@ -350,7 +351,9 @@
         IndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                         splitsAndConstraint.first, resourceFactory, true);
-        IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+        IndexCreateOperatorDescriptor indexCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);
         spec.addRoot(indexCreateOp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 7b4587b..161e86e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -149,7 +149,9 @@
                 mergePolicyFactory, mergePolicyProperties);
         IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                 fileSplitProvider, resourceFactory, true);
-        IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+        IndexCreateOperatorDescriptor indexCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
         indexCreateOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint);
         spec.addRoot(indexCreateOp);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index e8825fc..14c9dda 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -60,8 +60,9 @@
                 mergePolicyFactory, mergePolicyProperties);
         IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                 secondaryFileSplitProvider, resourceFactory, true);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
         IndexCreateOperatorDescriptor secondaryIndexCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap);
         secondaryIndexCreateOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
                 secondaryPartitionConstraint);
@@ -83,8 +84,9 @@
         IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
         // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
         IndexDropOperatorDescriptor btreeDrop =
-                new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, dropOptions);
+                new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, dropOptions, partitionsMap);
         btreeDrop.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
                 splitsAndConstraint.second);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 80ce5c4..86525f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -119,7 +119,8 @@
         IResourceFactory primaryResourceFactory = createPrimaryResourceFactory();
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
-        IndexCreateOperatorDescriptor primaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+        IndexCreateOperatorDescriptor primaryCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -162,7 +163,8 @@
         IResourceFactory secondaryResourceFactory = createSecondaryResourceFactory();
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
-        IndexCreateOperatorDescriptor secondaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+        IndexCreateOperatorDescriptor secondaryCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
@@ -269,7 +271,8 @@
 
     protected void destroyPrimaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
-        IndexDropOperatorDescriptor primaryDropOp = new IndexDropOperatorDescriptor(spec, primaryHelperFactory);
+        IndexDropOperatorDescriptor primaryDropOp =
+                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
         spec.addRoot(primaryDropOp);
         runTest(spec);
@@ -277,7 +280,8 @@
 
     protected void destroySecondaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
-        IndexDropOperatorDescriptor secondaryDropOp = new IndexDropOperatorDescriptor(spec, secondaryHelperFactory);
+        IndexDropOperatorDescriptor secondaryDropOp =
+                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
         spec.addRoot(secondaryDropOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 71ee656..3b16bb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -237,7 +237,8 @@
                 pageManagerFactory, null, null);
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
-        IndexCreateOperatorDescriptor primaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+        IndexCreateOperatorDescriptor primaryCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -295,7 +296,8 @@
         JobSpecification spec = new JobSpecification();
         IndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
-        IndexCreateOperatorDescriptor secondaryCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
+        IndexCreateOperatorDescriptor secondaryCreateOp =
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
@@ -409,7 +411,8 @@
 
     protected void destroyPrimaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
-        IndexDropOperatorDescriptor primaryDropOp = new IndexDropOperatorDescriptor(spec, primaryHelperFactory);
+        IndexDropOperatorDescriptor primaryDropOp =
+                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
         spec.addRoot(primaryDropOp);
         runTest(spec);
@@ -417,7 +420,8 @@
 
     protected void destroySecondaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
-        IndexDropOperatorDescriptor secondaryDropOp = new IndexDropOperatorDescriptor(spec, secondaryHelperFactory);
+        IndexDropOperatorDescriptor secondaryDropOp =
+                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
         spec.addRoot(secondaryDropOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index 6f59ed8..ee56375 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -236,4 +236,12 @@
         outputFiles.add(fileRef.getFile());
         return new ManagedFileSplit(ncs.getId(), fileName);
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
index 898321b..61b600a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
@@ -25,21 +25,30 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 
 public class IndexCreateOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IIndexBuilderFactory indexBuilderFactory;
+    private final int[][] partitionsMap;
 
-    public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory) {
+    public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexBuilderFactory indexBuilderFactory,
+            int[][] partitionsMap) {
         super(spec, 0, 0);
         this.indexBuilderFactory = indexBuilderFactory;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new IndexCreateOperatorNodePushable(indexBuilderFactory.create(ctx, partition));
+        int[] storagePartitions = partitionsMap[partition];
+        IIndexBuilder[] indexBuilders = new IIndexBuilder[storagePartitions.length];
+        for (int i = 0; i < storagePartitions.length; i++) {
+            indexBuilders[i] = indexBuilderFactory.create(ctx, storagePartitions[i]);
+        }
+        return new IndexCreateOperatorNodePushable(indexBuilders);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java
index 1eb6153..9f104fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorNodePushable.java
@@ -26,10 +26,10 @@
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 
 public class IndexCreateOperatorNodePushable extends AbstractOperatorNodePushable {
-    private final IIndexBuilder indexBuilder;
+    private final IIndexBuilder[] indexBuilders;
 
-    public IndexCreateOperatorNodePushable(IIndexBuilder indexBuilder) throws HyracksDataException {
-        this.indexBuilder = indexBuilder;
+    public IndexCreateOperatorNodePushable(IIndexBuilder[] indexBuilders) {
+        this.indexBuilders = indexBuilders;
     }
 
     @Override
@@ -49,7 +49,9 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        indexBuilder.build();
+        for (IIndexBuilder indexBuilder : indexBuilders) {
+            indexBuilder.build();
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index 032e758..32019f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -36,25 +36,27 @@
         WAIT_ON_IN_USE
     }
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IIndexDataflowHelperFactory dataflowHelperFactory;
     private final Set<DropOption> options;
+    private final int[][] partitionsMap;
 
     public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            IIndexDataflowHelperFactory dataflowHelperFactory) {
-        this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class));
+            IIndexDataflowHelperFactory dataflowHelperFactory, int[][] partitionsMap) {
+        this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class), partitionsMap);
     }
 
     public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options, int[][] partitionsMap) {
         super(spec, 0, 0);
         this.dataflowHelperFactory = dataflowHelperFactory;
         this.options = options;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition);
+        return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index 3b6669e..81d280e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -42,13 +42,18 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final long DROP_ATTEMPT_WAIT_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1);
-    private final IIndexDataflowHelper indexHelper;
+    private final IIndexDataflowHelper[] indexHelpers;
     private final Set<DropOption> options;
     private long maxWaitTimeMillis = TimeUnit.SECONDS.toMillis(30);
 
     public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, Set<DropOption> options,
-            IHyracksTaskContext ctx, int partition) throws HyracksDataException {
-        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+            IHyracksTaskContext ctx, int partition, int[][] partitionsMap) throws HyracksDataException {
+        int[] storagePartitions = partitionsMap[partition];
+        this.indexHelpers = new IIndexDataflowHelper[storagePartitions.length];
+        for (int i = 0; i < storagePartitions.length; i++) {
+            this.indexHelpers[i] =
+                    indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), storagePartitions[i]);
+        }
         this.options = options;
     }
 
@@ -69,7 +74,9 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        dropIndex();
+        for (IIndexDataflowHelper indexHelper : indexHelpers) {
+            dropIndex(indexHelper);
+        }
     }
 
     @Override
@@ -77,7 +84,7 @@
         // no op
     }
 
-    private void dropIndex() throws HyracksDataException {
+    private void dropIndex(IIndexDataflowHelper indexHelper) throws HyracksDataException {
         while (true) {
             try {
                 indexHelper.destroy();