[ASTERIXDB-3144][RT] Make index modification runtime support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
This patch changes the index modification runtime to support
operating on multiple partitions. With this change, an index
modification node pushable will write to multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.

Change-Id: I08da28f2a26fcaf581c2256312455fe541fae5ea
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17452
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 220311e..7490220 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -106,6 +106,25 @@
      * Close the AutoCloseable and suppress any Throwable thrown by the close call.
      * This method must NEVER throw any Throwable
      *
+     * @param closables
+     *            the resource to close
+     * @param root
+     *            the first exception encountered during release of resources
+     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+     */
+    public static Throwable close(AutoCloseable[] closables, Throwable root) {
+        if (closables != null) {
+            for (AutoCloseable closable : closables) {
+                root = close(closable, root);
+            }
+        }
+        return root;
+    }
+
+    /**
+     * Close the AutoCloseable and suppress any Throwable thrown by the close call.
+     * This method must NEVER throw any Throwable
+     *
      * @param closable
      *            the resource to close
      * @param root
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
index 292592b..cc922ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobId;
@@ -37,6 +38,7 @@
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -148,10 +150,16 @@
         IIndexDataflowHelperFactory primaryHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, primarySplitProvider);
 
+        int[][] partitionsMap = getPartitionsMap(splitNCs.length);
+        int[] pkFields = new int[] { primaryFieldPermutation[0] };
+        IBinaryHashFunctionFactory[] pkHashFunFactories =
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, splitNCs.length);
         // create operator descriptor
-        TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert =
-                new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, recDesc, primaryFieldPermutation,
-                        IndexOperation.INSERT, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+        TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recDesc, primaryFieldPermutation, IndexOperation.INSERT, primaryHelperFactory, null,
+                NoOpOperationCallbackFactory.INSTANCE, tuplePartitionerFactory, partitionsMap);
         JobHelper.createPartitionConstraint(spec, primaryInsert, splitNCs);
 
         // prepare insertion into secondary index
@@ -174,9 +182,15 @@
         IIndexDataflowHelperFactory secondaryHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, secondarySplitProvider);
         // create operator descriptor
+        int[] pkFields2 = new int[] { secondaryFieldPermutation[1] };
+        IBinaryHashFunctionFactory[] pkHashFunFactories2 =
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+        ITuplePartitionerFactory tuplePartitionerFactory2 =
+                new FieldHashPartitionerFactory(pkFields2, pkHashFunFactories2, splitNCs.length);
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsert =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, recDesc, secondaryFieldPermutation,
-                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory2, partitionsMap);
         JobHelper.createPartitionConstraint(spec, secondaryInsert, splitNCs);
 
         // end the insert pipeline at this sink operator
@@ -202,4 +216,12 @@
 
         return spec;
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index eac332d..80ce5c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -22,10 +22,13 @@
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.inputParserFactories;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.inputRecordDesc;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryFieldPermutation;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryHashFunFactories;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyFieldCount;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyFieldPermutation;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryRecDesc;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryFieldPermutationA;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryFieldPermutationB;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryPKFieldPermutationB;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryRecDesc;
 
 import java.io.DataOutput;
@@ -33,6 +36,7 @@
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
@@ -41,6 +45,7 @@
 import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -69,6 +74,7 @@
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.common.TreeOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.apache.hyracks.tests.integration.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -226,17 +232,26 @@
                 new DelimitedDataTupleParserFactory(inputParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
+        ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                primaryHashFunFactories, ordersSplits.length);
+
         // insert into primary index
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryBtreeInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, primaryFieldPermutation,
-                        pipelineOperation, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        pipelineOperation, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeInsertOp, NC1_ID);
 
         // first secondary index
         int[] fieldPermutationB = secondaryFieldPermutationB;
+        ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory(
+                secondaryPKFieldPermutationB, primaryHashFunFactories, ordersSplits.length);
+
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, fieldPermutationB,
-                        pipelineOperation, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        pipelineOperation, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory2, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java
index fbb59e2..8ff1736 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java
@@ -20,9 +20,11 @@
 package org.apache.hyracks.tests.am.btree;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -45,6 +47,7 @@
 
     // field, type and key declarations for primary index
     public static int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7 };
+    public static int[] primaryKeyFieldPermutation = new int[] { 0 };
     public static final int[] primaryFilterFields = new int[] { 0 };
     public static final int[] primaryBtreeFields = new int[] { 0, 1, 2, 3, 4, 5 };
 
@@ -58,6 +61,8 @@
 
     public static final IBinaryComparatorFactory[] primaryComparatorFactories =
             new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE };
+    public static final IBinaryHashFunctionFactory[] primaryHashFunFactories =
+            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) };
     public static final int primaryKeyFieldCount = primaryComparatorFactories.length;
 
     public static final int[] primaryBloomFilterKeyFields = new int[] { 0 };
@@ -78,6 +83,7 @@
     public static final int secondaryKeyFieldCount = 2;
     public static final int[] secondaryFieldPermutationA = { 3, 0 };
     public static final int[] secondaryFieldPermutationB = { 4, 0 };
+    public static final int[] secondaryPKFieldPermutationB = { 1 };
     public static final int[] secondaryFilterFields = new int[] { 1 };
     public static final int[] secondaryBtreeFields = new int[] { 0, 1 };
     public static final int[] secondaryBloomFilterKeyFields = new int[] { 0, 1 };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index f849bc8..71ee656 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.hyracks.tests.am.rtree;
 
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryHashFunFactories;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyFieldPermutation;
+
 import java.io.DataOutput;
 import java.io.File;
 
@@ -26,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -42,6 +46,7 @@
 import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -74,6 +79,7 @@
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.apache.hyracks.tests.integration.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -367,18 +373,25 @@
                 ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
         // insert into primary index
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
+        ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                primaryHashFunFactories, ordersSplits.length);
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, primaryFieldPermutation,
-                        IndexOperation.INSERT, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        IndexOperation.INSERT, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryInsertOp, NC1_ID);
 
         // secondary index
         int[] secondaryFieldPermutation = { 9, 10, 11, 12, 0 };
+        ITuplePartitionerFactory tuplePartitionerFactory2 =
+                new FieldHashPartitionerFactory(new int[] { 4 }, primaryHashFunFactories, ordersSplits.length);
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, secondaryFieldPermutation,
-                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory2, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
index d53a04c..8f3181d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
@@ -33,7 +33,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-class TestUtil {
+public class TestUtil {
 
     private static final String HOST = "127.0.0.1";
     private static final int PORT = 16001;
@@ -68,4 +68,12 @@
     static ObjectNode httpGetAsObject(URI uri) throws URISyntaxException, IOException {
         return getResultAsJson(httpGetAsString(uri));
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 9ff9249..d3def46 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -23,6 +23,8 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -38,53 +40,76 @@
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.LocalResource;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final IHyracksTaskContext ctx;
-    protected final IIndexDataflowHelper indexHelper;
     protected final RecordDescriptor inputRecDesc;
     protected final IndexOperation op;
     protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
     protected FrameTupleAccessor accessor;
     protected FrameTupleReference frameTuple;
     protected IFrame writeBuffer;
-    protected IIndexAccessor indexAccessor;
     protected ITupleFilter tupleFilter;
-    protected IModificationOperationCallback modCallback;
-    protected IIndex index;
+    protected final IIndex[] indexes;
+    protected final IIndexAccessor[] indexAccessors;
+    protected final IIndexDataflowHelper[] indexHelpers;
+    protected final IModificationOperationCallback[] modCallbacks;
     protected final IModificationOperationCallbackFactory modOpCallbackFactory;
     protected final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitioner tuplePartitioner;
+    protected final int[] partitions;
+    protected final Int2IntMap storagePartitionId2Index;
+    protected boolean writerOpen;
 
     public IndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IndexOperation op, IModificationOperationCallbackFactory modOpCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        this.partitions = partitionsMap != null ? partitionsMap[partition] : new int[] { partition };
+        this.indexes = new IIndex[partitions.length];
+        this.indexAccessors = new IIndexAccessor[partitions.length];
+        this.modCallbacks = new IModificationOperationCallback[partitions.length];
+        this.storagePartitionId2Index = new Int2IntOpenHashMap();
+        this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            storagePartitionId2Index.put(partitions[i], i);
+            indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+        }
         this.modOpCallbackFactory = modOpCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.inputRecDesc = inputRecDesc;
         this.op = op;
         this.tuple.setFieldPermutation(fieldPermutation);
+        this.tuplePartitioner = tuplePartitionerFactory.createPartitioner(ctx);
     }
 
     @Override
     public void open() throws HyracksDataException {
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
         try {
+            for (int i = 0; i < indexHelpers.length; i++) {
+                IIndexDataflowHelper indexHelper = indexHelpers[i];
+                indexHelper.open();
+                indexes[i] = indexHelper.getIndexInstance();
+                LocalResource resource = indexHelper.getResource();
+                modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
+                IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
+                indexAccessors[i] = indexes[i].createAccessor(iap);
+            }
             writer.open();
-            LocalResource resource = indexHelper.getResource();
-            modCallback = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
-            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            indexAccessor = index.createAccessor(iap);
+            writerOpen = true;
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
                 frameTuple = new FrameTupleReference();
@@ -108,6 +133,9 @@
                 }
                 tuple.reset(accessor, i);
 
+                int storagePartition = tuplePartitioner.partition(accessor, i);
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                IIndexAccessor indexAccessor = indexAccessors[storageIdx];
                 switch (op) {
                     case INSERT: {
                         try {
@@ -158,18 +186,24 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (index != null) {
-            try {
+        Throwable failure = null;
+        try {
+            if (writerOpen) {
                 writer.close();
-            } finally {
-                indexHelper.close();
             }
+        } finally {
+            for (IIndexDataflowHelper indexHelper : indexHelpers) {
+                failure = ResourceReleaseUtils.close(indexHelper, failure);
+            }
+        }
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        if (index != null) {
+        if (writerOpen) {
             writer.fail();
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index f5cab5c..81607e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -32,17 +33,19 @@
 
 public class TreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IIndexDataflowHelperFactory indexHelperFactory;
     private final IndexOperation op;
     private final int[] fieldPermutation;
     private final IModificationOperationCallbackFactory modificationOpCallbackFactory;
     private final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitionerFactory tuplePartitionerFactory;
+    protected final int[][] partitionsMap;
 
     public TreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory,
-            ITupleFilterFactory tupleFilterFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.fieldPermutation = fieldPermutation;
@@ -50,6 +53,8 @@
         this.modificationOpCallbackFactory = modificationOpCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.outRecDescs[0] = outRecDesc;
+        this.tuplePartitionerFactory = tuplePartitionerFactory;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -57,6 +62,6 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new IndexInsertUpdateDeleteOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), op, modificationOpCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
index d8d155e..ba932e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -94,5 +94,9 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index b6192c1..4cd8a11 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.common.api;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface IFrameTupleProcessor {
@@ -37,7 +38,7 @@
      *            the index of the tuple in the frame
      * @throws HyracksDataException
      */
-    void process(ITupleReference tuple, int index) throws HyracksDataException;
+    void process(FrameTupleAccessor accessor, ITupleReference tuple, int index) throws HyracksDataException;
 
     /**
      * Called once per batch before ending the batch process
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9e8c568..214d9dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.common.api;
 
 import java.util.List;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,7 +40,6 @@
      * @param tuple
      *            the operation tuple
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
 
@@ -54,7 +54,6 @@
      *            the operation tuple
      * @return
      * @throws HyracksDataException
-     * @throws IndexException
      */
     boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException;
@@ -69,7 +68,6 @@
      * @param pred
      *            the search predicate
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
 
@@ -104,9 +102,7 @@
      * Schedule a merge
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
-     * @throws IndexException
      */
     ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
@@ -114,9 +110,7 @@
      * Schedule full merge
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
-     * @throws IndexException
      */
     ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
@@ -125,7 +119,6 @@
      *
      * @param operation
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void merge(ILSMIOOperation operation) throws HyracksDataException;
 
@@ -133,7 +126,6 @@
      * Schedule a flush
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
      */
     ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -143,7 +135,6 @@
      *
      * @param operation
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void flush(ILSMIOOperation operation) throws HyracksDataException;
 
@@ -153,7 +144,6 @@
      * @param ioOperation
      *            the io operation that added the new component
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
 
@@ -235,10 +225,13 @@
      *            the tuple processor
      * @param frameOpCallback
      *            the callback at the end of the frame
+     * @param tuples
+     *            the indexes of tuples to process
      * @throws HyracksDataException
      */
     void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+            throws HyracksDataException;
 
     /**
      * Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 02744ee..c64e1e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -47,15 +48,15 @@
     public LSMIndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IndexOperation op, IModificationOperationCallbackFactory modCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int nextFlushTupleIndex = 0;
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
@@ -68,6 +69,9 @@
                 }
                 tuple.reset(accessor, i);
 
+                int storagePartition = tuplePartitioner.partition(accessor, i);
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
                 switch (op) {
                     case INSERT: {
                         if (!lsmAccessor.tryInsert(tuple)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index fb884f7..26ce56a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -33,24 +34,29 @@
 
 public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     protected final int[] fieldPermutation;
     protected final IndexOperation op;
     protected final IIndexDataflowHelperFactory indexHelperFactory;
     protected final IModificationOperationCallbackFactory modCallbackFactory;
     protected final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitionerFactory tuplePartitionerFactory;
+    protected final int[][] partitionsMap;
 
     public LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
             RecordDescriptor outRecDesc, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation,
             IndexOperation op, IModificationOperationCallbackFactory modCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory) {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.modCallbackFactory = modCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.fieldPermutation = fieldPermutation;
         this.op = op;
+        this.tuplePartitionerFactory = tuplePartitionerFactory;
+        this.partitionsMap = partitionsMap;
         this.outRecDescs[0] = outRecDesc;
     }
 
@@ -59,6 +65,6 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new LSMIndexInsertUpdateDeleteOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), op, modCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 950a8e5..e2c9fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -21,6 +21,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 
@@ -711,12 +712,13 @@
 
     @Override
     public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+            throws HyracksDataException {
         processor.start();
         enter(ctx);
         try {
             try {
-                processFrame(accessor, tuple, processor);
+                processFrame(accessor, tuple, processor, tuples);
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -860,13 +862,14 @@
     }
 
     private static void processFrame(FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor) throws HyracksDataException {
+            IFrameTupleProcessor processor, Set<Integer> tuples) throws HyracksDataException {
         int tupleCount = accessor.getTupleCount();
-        int i = 0;
-        while (i < tupleCount) {
+        for (int i = 0; i < tupleCount; i++) {
+            if (!tuples.contains(i)) {
+                continue;
+            }
             tuple.reset(accessor, i);
-            processor.process(tuple, i);
-            i++;
+            processor.process(accessor, tuple, i);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 8412b8c..fb5984d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.util.List;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -210,8 +211,8 @@
     }
 
     public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
-            IFrameOperationCallback frameOpCallback) throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+            IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
     }
 
     @Override