[ASTERIXDB-3144][HYR][RT] Make other operators support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
This patch changes the following operators to support
operating on multiple partitions. This is a step towards
achieving compute/storage separation:
- compact dataset/index
- drop dataverse
- remove feed storage
- RTree search

Remove unused getWriteResultRuntime from IMetadataProvider.

Change-Id: I2b1223c0d1c3248305014396dec4c1d0bdef13f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17505
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index d5743cb..bfba414 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -34,8 +34,8 @@
     public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) {
         JobSpecification jobSpec = RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
         PartitioningProperties partitioningProperties = metadata.splitAndConstraints(dataverse.getDataverseName());
-        FileRemoveOperatorDescriptor frod =
-                new FileRemoveOperatorDescriptor(jobSpec, partitioningProperties.getSpiltsProvider(), false);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec,
+                partitioningProperties.getSpiltsProvider(), false, partitioningProperties.getComputeStorageMap());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod,
                 partitioningProperties.getConstraints());
         jobSpec.addRoot(frod);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 07500f7..72961b8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -26,21 +26,17 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.transactions.TxnId;
-import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -50,7 +46,6 @@
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.common.base.Expression;
@@ -114,13 +109,11 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 
@@ -155,23 +148,14 @@
     }
 
     public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider metadataProvider, Feed feed)
-            throws AsterixException {
+            throws AlgebricksException {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-        AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
-        Set<String> nodes = new TreeSet<>();
-        for (String node : allCluster.getLocations()) {
-            nodes.add(node);
-        }
-        AlgebricksAbsolutePartitionConstraint locations =
-                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()]));
-        FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
-        org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
-                StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
-        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, spC.first, true);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, spC.second);
+        PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(feed);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec,
+                partitioningProperties.getSpiltsProvider(), true, partitioningProperties.getComputeStorageMap());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
+                partitioningProperties.getConstraints());
         spec.addRoot(frod);
         return spec;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c648b33..a8df92a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -683,10 +683,12 @@
         IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            int[][] partitionsMap = partitioningProperties.getComputeStorageMap();
             rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true,
                     indexDataflowHelperFactory, retainInput, retainMissing, nonMatchWriterFactory,
                     searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, propagateFilter,
-                    nonFilterWriterFactory, isIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+                    nonFilterWriterFactory, isIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+                    partitionsMap);
         } else {
             // Create the operator
             rtreeSearchOp = null;
@@ -732,52 +734,6 @@
     }
 
     @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
-            IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        DataverseName dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        int numKeys = keys.size();
-        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
-
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
-        int[] pkFields = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            pkFields[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys + 1] = idx;
-        }
-
-        PartitioningProperties partitioningProperties = getPartitioningProperties(dataset);
-        long numElementsHint = getCardinalityPerPartitionHint(dataset);
-        // TODO
-        // figure out the right behavior of the bulkload and then give the
-        // right callback
-        // (ex. what's the expected behavior when there is an error during
-        // bulkload?)
-        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
-                partitioningProperties.getNumberOfPartitions());
-        IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
-                storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider());
-        LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
-                fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
-                indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
-                partitioningProperties.getComputeStorageMap());
-        return new Pair<>(btreeBulkLoad, partitioningProperties.getConstraints());
-    }
-
-    @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
             IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
@@ -972,7 +928,7 @@
     }
 
     public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
-        return dataPartitioningProvider.splitAndConstraints(dataverseName);
+        return dataPartitioningProvider.getPartitioningProperties(dataverseName);
     }
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName)
@@ -1837,6 +1793,10 @@
         return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName);
     }
 
+    public PartitioningProperties getPartitioningProperties(Feed feed) throws AlgebricksException {
+        return dataPartitioningProvider.getPartitioningProperties(feed);
+    }
+
     public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
         List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
                 .filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex())
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index f5e96b1..ec4c985 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -21,14 +21,22 @@
 import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
 import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
 
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.utils.PartitioningScheme;
 import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -48,7 +56,7 @@
         scheme = appCtx.getStorageProperties().getPartitioningScheme();
     }
 
-    public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
+    public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
         if (scheme == DYNAMIC) {
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
                     .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName);
@@ -75,6 +83,25 @@
         throw new IllegalStateException();
     }
 
+    public PartitioningProperties getPartitioningProperties(Feed feed) throws AsterixException {
+        if (scheme == DYNAMIC) {
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
+            Set<String> nodes = new TreeSet<>(Arrays.asList(allCluster.getLocations()));
+            AlgebricksAbsolutePartitionConstraint locations =
+                    new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
+            FileSplit[] feedLogFileSplits =
+                    FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+                    StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+            int[][] partitionsMap = getPartitionsMap(getNumPartitions(spC.second));
+            return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
+        } else if (scheme == STATIC) {
+            throw new NotImplementedException();
+        }
+        throw new IllegalStateException();
+    }
+
     private static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
         if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
             return ((AlgebricksCountPartitionConstraint) constraint).getCount();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 27e06eb..130e39c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -370,8 +370,8 @@
         IIndexDataflowHelperFactory indexHelperFactory =
                 new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                         partitioningProperties.getSpiltsProvider());
-        LSMTreeIndexCompactOperatorDescriptor compactOp =
-                new LSMTreeIndexCompactOperatorDescriptor(spec, indexHelperFactory);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                indexHelperFactory, partitioningProperties.getComputeStorageMap());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
                 partitioningProperties.getConstraints());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 498f3d4..8dc0d96 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -100,8 +100,8 @@
         IIndexDataflowHelperFactory dataflowHelperFactory =
                 new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                         partitioningProperties.getSpiltsProvider());
-        LSMTreeIndexCompactOperatorDescriptor compactOp =
-                new LSMTreeIndexCompactOperatorDescriptor(spec, dataflowHelperFactory);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                dataflowHelperFactory, partitioningProperties.getComputeStorageMap());
         compactOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
                 secondaryPartitionConstraint);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index ef9e75b..dce8638 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -59,6 +59,7 @@
     public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
             String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames) {
         super(spec, 1, 1);
+        //TODO(partitioning)
         outRecDescs[0] = rDesc;
         this.operatorName = operatorName;
         this.indexes = indexes;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
index 4f1dfd7..50fe998 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -47,6 +47,7 @@
             IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
             int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
         super(spec, 1, 1);
+        //TODO(partitioning) correlated
         this.outRecDescs[0] = outRecDesc;
         this.primaryIndexHelperFactory = primaryIndexHelperFactory;
         this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
index f788d23..488b73b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
@@ -48,6 +48,7 @@
             int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree, boolean excludeUnknownKeys,
             boolean forAnyUnknownKey) {
         super(spec, 1, 1);
+        //TODO(partitioning) correlated
         this.outRecDescs[0] = outRecDesc;
         this.missingWriterFactory = missingWriterFactory;
         this.numTagFields = numTagFields;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4a6e76e..e34ff21 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -66,11 +66,6 @@
             IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
             IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
 
-    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
-            IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
-            List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
-            throws AlgebricksException;
-
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterKeyFields,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index ed03cc7..49110b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -82,8 +82,8 @@
      *
      * @param constraintAcceptor
      *            - Constraint Acceptor
-     * @param plan
-     *            - Job Plan
+     * @param ccServiceCtx
+     *            - CC Service Context
      */
     void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCServiceContext ccServiceCtx);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
index 57a5f69..76cd48a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.dataflow.std.file;
 
 import java.io.File;
-import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -32,36 +31,31 @@
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 
 public class FileRemoveOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
+    private static final long serialVersionUID = 2L;
+
     private final IFileSplitProvider fileSplitProvider;
     private final boolean quietly;
+    private final int[][] partitionsMap;
 
     public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder,
-            boolean quietly) {
+            boolean quietly, int[][] partitionsMap) {
         super(spec, 0, 0);
         this.fileSplitProvider = fileSplitProvder;
         this.quietly = quietly;
+        this.partitionsMap = partitionsMap;
     }
 
-    /**
-     *
-     * @deprecated use {@link #FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder, boolean quietly)} instead.
-     */
-    @Deprecated
-    public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
-        this(spec, fileSplitProvder, false);
-    }
-
-    private static final long serialVersionUID = 1L;
-
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+        final FileSplit[] splits = fileSplitProvider.getFileSplits();
+        final int[] splitsIndexes = partitionsMap[partition];
         final IIOManager ioManager = ctx.getIoManager();
         return new AbstractOperatorNodePushable() {
 
@@ -73,16 +67,7 @@
             @Override
             public void initialize() throws HyracksDataException {
                 // will only work for files inside the io devices
-                File f = split.getFile(ioManager);
-                if (quietly) {
-                    FileUtils.deleteQuietly(f);
-                } else {
-                    try {
-                        FileUtils.deleteDirectory(f);
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                }
+                deleteFiles();
             }
 
             @Override
@@ -98,6 +83,25 @@
             @Override
             public void deinitialize() throws HyracksDataException {
             }
+
+            private void deleteFiles() throws HyracksDataException {
+                Throwable failure = null;
+                for (int splitsIndex : splitsIndexes) {
+                    try {
+                        File file = splits[splitsIndex].getFile(ioManager);
+                        if (quietly) {
+                            FileUtils.deleteQuietly(file);
+                        } else {
+                            FileUtils.deleteDirectory(file);
+                        }
+                    } catch (Throwable th) {
+                        failure = ExceptionUtils.suppress(failure, th);
+                    }
+                }
+                if (failure != null) {
+                    throw HyracksDataException.create(failure);
+                }
+            }
         };
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
index 7b687c4..1302bce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -30,9 +30,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    private int[] fieldSlots;
-    private byte[] tupleData;
-    private int tupleSize;
+    private final int[] fieldSlots;
+    private final byte[] tupleData;
+    private final int tupleSize;
 
     public ConstantTupleSourceOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
             int[] fieldSlots, byte[] tupleData, int tupleSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 9bd0c59..785a330 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -26,11 +26,11 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-    private IHyracksTaskContext ctx;
 
-    private int[] fieldSlots;
-    private byte[] tupleData;
-    private int tupleSize;
+    private final IHyracksTaskContext ctx;
+    private final int[] fieldSlots;
+    private final byte[] tupleData;
+    private final int tupleSize;
 
     public ConstantTupleSourceOperatorNodePushable(IHyracksTaskContext ctx, int[] fieldSlots, byte[] tupleData,
             int tupleSize) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
index 6a31962..68e4ac5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.rtree.RTreeSecondaryIndexSearchOperatorTest;
 import org.junit.Test;
@@ -95,7 +96,7 @@
         int[] keyFields = { 0, 1, 2, 3 };
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec,
                 secondaryWithFilterRecDesc, keyFields, true, true, secondaryHelperFactory, false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { createFile(nc1) });
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index 0fcf892..f233417 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -86,9 +87,9 @@
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
         int[] keyFields = { 0, 1, 2, 3 };
-        RTreeSearchOperatorDescriptor secondarySearchOp =
-                new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, keyFields, true, true, secondaryHelperFactory,
-                        false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+        RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                keyFields, true, true, secondaryHelperFactory, false, false, null,
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
         // fifth field from the tuples coming from secondary index
         int[] primaryLowKeyFields = { 4 };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 3cfef19..2748988 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,9 +85,9 @@
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
         int[] keyFields = null;
-        RTreeSearchOperatorDescriptor secondarySearchOp =
-                new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, keyFields, true, true, secondaryHelperFactory,
-                        false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+        RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                keyFields, true, true, secondaryHelperFactory, false, false, null,
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { createFile(nc1) });
         IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 8ea0701..da5de77 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,9 +85,9 @@
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
         int[] keyFields = { 0, 1, 2, 3 };
-        RTreeSearchOperatorDescriptor secondarySearchOp =
-                new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, keyFields, true, true, secondaryHelperFactory,
-                        false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+        RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                keyFields, true, true, secondaryHelperFactory, false, false, null,
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
         // fifth field from the tuples coming from secondary index
         int[] primaryLowKeyFields = { 4 };
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 88c06eb..c3719c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -131,6 +131,7 @@
             ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
         this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+        //TODO(partitioning) partitionsMap should not be null
         this.partitions = partitionsMap != null ? partitionsMap[partition] : new int[] { partition };
         for (int i = 0; i < partitions.length; i++) {
             storagePartitionId2Index.put(partitions[i], i);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
index 099d668..06dad1c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
@@ -37,6 +37,7 @@
     public TreeIndexDiskOrderScanOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory) {
         super(spec, 0, 1);
+        //TODO(maybe don't fix)
         this.indexHelperFactory = indexHelperFactory;
         this.searchCallbackFactory = searchCallbackFactory;
         this.outRecDescs[0] = outRecDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
index 6ed1858..104eed4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
@@ -40,6 +40,7 @@
     public TreeIndexStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, IStorageManager storageManager,
             IIndexDataflowHelperFactory indexHelperFactory) {
         super(spec, 0, 1);
+        //TODO(maybe don't fix)
         this.indexHelperFactory = indexHelperFactory;
         this.storageManager = storageManager;
         this.outRecDescs[0] = recDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
index 291363d..da41f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
@@ -38,6 +38,7 @@
     public LSMBTreeDiskComponentScanOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory) {
         super(spec, 1, 1);
+        //TODO(partitioning) correlated
         this.indexHelperFactory = indexHelperFactory;
         this.searchCallbackFactory = searchCallbackFactory;
         this.outRecDescs[0] = outRecDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
index 682ffef..139a871 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -30,16 +31,25 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 
 public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
-    private final IIndexDataflowHelper indexHelper;
+
+    private final IIndexDataflowHelper[] indexHelpers;
 
     public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int partition,
-            IIndexDataflowHelperFactory indexHelperFactory) throws HyracksDataException {
-        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+            IIndexDataflowHelperFactory indexHelperFactory, int[][] partitionsMap) throws HyracksDataException {
+        int[] partitions = partitionsMap[partition];
+        indexHelpers = new IIndexDataflowHelper[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+        }
+
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        indexHelper.close();
+        Throwable failure = CleanupUtils.close(indexHelpers, null);
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
+        }
     }
 
     @Override
@@ -54,10 +64,12 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        indexHelper.open();
-        ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
-        ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        accessor.scheduleFullMerge();
+        for (IIndexDataflowHelper indexHelper : indexHelpers) {
+            indexHelper.open();
+            ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            accessor.scheduleFullMerge();
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
index 6da4c8f..aad1cc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -29,18 +29,20 @@
 
 public class LSMTreeIndexCompactOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IIndexDataflowHelperFactory indexHelperFactory;
+    private final int[][] partitionsMap;
 
     public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            IIndexDataflowHelperFactory indexHelperFactory) {
+            IIndexDataflowHelperFactory indexHelperFactory, int[][] partitionsMap) {
         super(spec, 0, 0);
         this.indexHelperFactory = indexHelperFactory;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new LSMIndexCompactOperatorNodePushable(ctx, partition, indexHelperFactory);
+        return new LSMIndexCompactOperatorNodePushable(ctx, partition, indexHelperFactory, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index cc6d76d..8e7a7fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -32,7 +32,7 @@
 
 public class RTreeSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     protected final int[] keyFields;
     protected final boolean lowKeyInclusive;
     protected final boolean highKeyInclusive;
@@ -45,6 +45,7 @@
     protected final boolean retainMissing;
     protected final IMissingWriterFactory missingWriterFactory;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
+    protected final int[][] partitionsMap;
     protected boolean appendOpCallbackProceedResult;
     protected byte[] searchCallbackProceedResultFalseValue;
     protected byte[] searchCallbackProceedResultTrueValue;
@@ -53,10 +54,11 @@
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory indexHelperFactory,
             boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes,
-            int[] maxFilterFieldIndexes, boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory) {
+            int[] maxFilterFieldIndexes, boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
+            int[][] partitionsMap) {
         this(spec, outRecDesc, keyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
                 retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
-                maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, false, null, null);
+                maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, false, null, null, partitionsMap);
     }
 
     public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] keyFields,
@@ -65,7 +67,7 @@
             ISearchOperationCallbackFactory searchCallbackFactory, int[] minFilterFieldIndexes,
             int[] maxFilterFieldIndexes, boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) {
+            byte[] searchCallbackProceedResultTrueValue, int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.retainInput = retainInput;
@@ -79,6 +81,7 @@
         this.maxFilterFieldIndexes = maxFilterFieldIndexes;
         this.appendIndexFilter = appendIndexFilter;
         this.nonFilterWriterFactory = nonFilterWriterFactory;
+        this.partitionsMap = partitionsMap;
         this.outRecDescs[0] = outRecDesc;
         this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
         this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
@@ -92,6 +95,6 @@
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), keyFields, minFilterFieldIndexes,
                 maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory,
                 searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, appendOpCallbackProceedResult,
-                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue);
+                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index aef624f..7a7dc0e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -47,12 +47,12 @@
             IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
             boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
+            byte[] searchCallbackProceedResultTrueValue, int[][] partitionsMap) throws HyracksDataException {
         // TODO: predicate & limit pushdown not enabled for RTree yet
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, null, -1, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
-                searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE, null, null);
+                searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE, null, partitionsMap);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
             searchKey.setFieldPermutation(keyFields);