[ASTERIXDB-3144][HYR][RT] Make sampling job support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch changes the sampling job to support
operating on multiple partitions. This is a step towards
achieving compute/storage separation.

Change-Id: If9abc68402adfe47ddeb5f1b1499e3414369f506
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17511
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index a3518852..8e32fa2 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -658,6 +658,12 @@
       <artifactId>hyracks-util</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
     </dependency>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d2fd4be..98d93fd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4459,7 +4459,7 @@
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (Index index : indexes) {
-                    if (index.isSecondaryIndex()) {
+                    if (index.isSecondaryIndex() && !index.isSampleIndex()) {
                         jobsToExecute.add(
                                 IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider, sourceLoc));
                     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 682b636..d3c708d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -116,6 +116,7 @@
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.TestUtil;
 import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -214,7 +215,7 @@
                 fieldPermutation[i] = i;
             }
             int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-            int[][] partitionsMap = getPartitionsMap(numPartitions);
+            int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
             ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
                     primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -263,7 +264,7 @@
             }
 
             int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-            int[][] partitionsMap = getPartitionsMap(numPartitions);
+            int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
             ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
                     primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -372,7 +373,7 @@
             IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
             int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-            int[][] partitionsMap = getPartitionsMap(numPartitions);
+            int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
             ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
                     primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -838,7 +839,7 @@
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
         IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
         ITuplePartitionerFactory tuplePartitionerFactory =
                 new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
@@ -912,12 +913,4 @@
         }
         return new RecordDescriptor(outputSerDes, outputTypeTraits);
     }
-
-    private static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index e1092dd..bb52946 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -57,7 +57,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -176,7 +176,7 @@
         dataflowHelper.open();
         // try to drop in-use index (should fail)
         IndexDropOperatorNodePushable dropInUseOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
+                EnumSet.noneOf(DropOption.class), ctx, 0, TestUtil.getPartitionsMap(1));
         try {
             dropInUseOp.initialize();
         } catch (HyracksDataException e) {
@@ -192,7 +192,7 @@
         dropFailed.set(false);
         // drop with option wait for in-use should be successful once the index is closed
         final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestUtils.getPartitionsMap(1));
+                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestUtil.getPartitionsMap(1));
         Thread dropThread = new Thread(() -> {
             try {
                 dropWithWaitOp.initialize();
@@ -216,7 +216,7 @@
         dropFailed.set(false);
         // Dropping non-existing index
         IndexDropOperatorNodePushable dropNonExistingOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
+                EnumSet.noneOf(DropOption.class), ctx, 0, TestUtil.getPartitionsMap(1));
         try {
             dropNonExistingOp.initialize();
         } catch (HyracksDataException e) {
@@ -232,7 +232,7 @@
         // Dropping non-existing index with if exists option should be successful
         dropFailed.set(false);
         IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestUtils.getPartitionsMap(1));
+                EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestUtil.getPartitionsMap(1));
         try {
             dropNonExistingWithIfExistsOp.initialize();
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0b011d0..3404ace 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,7 +26,6 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -90,7 +89,6 @@
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataUtil;
-import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionExtensionManager;
 import org.apache.asterix.om.functions.IFunctionManager;
@@ -154,7 +152,6 @@
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -1796,22 +1793,9 @@
         return dataPartitioningProvider.getPartitioningProperties(feed);
     }
 
-    public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
-        List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
-                .filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex())
-                .collect(Collectors.toList());
-        if (dsIndexes.isEmpty()) {
-            return Collections.emptyList();
-        }
-        List<String> datasetNodes = findNodes(ds.getNodeGroupName());
-        List<Pair<IFileSplitProvider, String>> indexesSplits =
-                dsIndexes.stream()
-                        .map(idx -> new Pair<>(
-                                StoragePathUtil.splitProvider(SplitsAndConstraintsUtil.getIndexSplits(
-                                        appCtx.getClusterStateManager(), ds, idx.getIndexName(), datasetNodes)),
-                                idx.getIndexName()))
-                        .collect(Collectors.toList());
-        return indexesSplits;
+    public List<Index> getSecondaryIndexes(Dataset ds) throws AlgebricksException {
+        return getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
+                .filter(idx -> idx.isSecondaryIndex() && !idx.isSampleIndex()).collect(Collectors.toList());
     }
 
     public LockList getLocks() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index b6f0e96..030895b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -102,7 +102,7 @@
 
     private final MetadataProvider metadataProvider;
     private final Dataset dataset;
-    private final Index index;
+    private final Index sampleIdx;
     private final SourceLocation sourceLoc;
 
     private ARecordType itemType;
@@ -115,11 +115,12 @@
     private Map<String, String> mergePolicyProperties;
     private int groupbyNumFrames;
     private int[][] computeStorageMap;
+    private int numPartitions;
 
-    protected SampleOperationsHelper(Dataset dataset, Index index, MetadataProvider metadataProvider,
+    protected SampleOperationsHelper(Dataset dataset, Index sampleIdx, MetadataProvider metadataProvider,
             SourceLocation sourceLoc) {
         this.dataset = dataset;
-        this.index = index;
+        this.sampleIdx = sampleIdx;
         this.metadataProvider = metadataProvider;
         this.sourceLoc = sourceLoc;
     }
@@ -135,11 +136,16 @@
         comparatorFactories = dataset.getPrimaryComparatorFactories(metadataProvider, itemType, metaType);
         groupbyNumFrames = getGroupByNumFrames(metadataProvider, sourceLoc);
 
-        PartitioningProperties partitioningProperties =
-                metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
-        fileSplitProvider = partitioningProperties.getSpiltsProvider();
-        partitionConstraint = partitioningProperties.getConstraints();
-        computeStorageMap = partitioningProperties.getComputeStorageMap();
+        // make sure to always use the dataset + index to get the partitioning properties
+        // this is because in some situations the nodegroup of the passed dataset is different from the index
+        // this can happen during a rebalance for example where the dataset represents the new target dataset while
+        // the index object information is fetched from the old source dataset
+        PartitioningProperties samplePartitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, sampleIdx.getIndexName());
+        fileSplitProvider = samplePartitioningProperties.getSpiltsProvider();
+        partitionConstraint = samplePartitioningProperties.getConstraints();
+        computeStorageMap = samplePartitioningProperties.getComputeStorageMap();
+        numPartitions = samplePartitioningProperties.getNumberOfPartitions();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         mergePolicyFactory = compactionInfo.first;
@@ -150,7 +156,7 @@
     public JobSpecification buildCreationJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
-        IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaType,
+        IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType,
                 mergePolicyFactory, mergePolicyProperties);
         IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                 fileSplitProvider, resourceFactory, true);
@@ -165,7 +171,7 @@
 
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
-        Index.SampleIndexDetails indexDetails = (Index.SampleIndexDetails) index.getIndexDetails();
+        Index.SampleIndexDetails indexDetails = (Index.SampleIndexDetails) sampleIdx.getIndexDetails();
         int sampleCardinalityTarget = indexDetails.getSampleCardinalityTarget();
         long sampleSeed = indexDetails.getSampleSeed();
         IDataFormat format = metadataProvider.getDataFormat();
@@ -189,16 +195,18 @@
         sourceOp = targetOp;
 
         // primary index scan ----> stream stats op
-        List<Pair<IFileSplitProvider, String>> indexesInfo = metadataProvider.getSplitProviderOfAllIndexes(dataset);
-        IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[indexesInfo.size()];
-        String[] names = new String[indexesInfo.size()];
+        List<Index> dsIndexes = metadataProvider.getSecondaryIndexes(dataset);
+        IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[dsIndexes.size()];
+        String[] names = new String[dsIndexes.size()];
         for (int i = 0; i < indexes.length; i++) {
-            Pair<IFileSplitProvider, String> indexInfo = indexesInfo.get(i);
-            indexes[i] = new IndexDataflowHelperFactory(storageMgr, indexInfo.first);
-            names[i] = indexInfo.second;
+            Index idx = dsIndexes.get(i);
+            PartitioningProperties idxPartitioningProps =
+                    metadataProvider.getPartitioningProperties(dataset, idx.getIndexName());
+            indexes[i] = new IndexDataflowHelperFactory(storageMgr, idxPartitioningProps.getSpiltsProvider());
+            names[i] = idx.getIndexName();
         }
-        targetOp =
-                new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes, names);
+        targetOp = new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes,
+                names, computeStorageMap);
         spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
         sourceOp = targetOp;
 
@@ -318,18 +326,17 @@
     protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
             int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor,
             long numElementHint) throws AlgebricksException {
-        PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
         int[] pkFields = new int[dataset.getPrimaryKeys().size()];
         for (int i = 0; i < pkFields.length; i++) {
             pkFields[i] = fieldPermutation[i];
         }
         IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
-        ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
-                partitioningProperties.getNumberOfPartitions());
+        ITuplePartitionerFactory partitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
         LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
                 recordDesc, fieldPermutation, fillFactor, false, numElementHint, true, dataflowHelperFactory, null,
                 LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
-                partitioningProperties.getComputeStorageMap());
+                computeStorageMap);
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 partitionConstraint);
@@ -339,7 +346,7 @@
     @Override
     public JobSpecification buildDropJobSpec(Set<IndexDropOperatorDescriptor.DropOption> options)
             throws AlgebricksException {
-        return SecondaryTreeIndexOperationsHelper.buildDropJobSpecImpl(dataset, index, options, metadataProvider,
+        return SecondaryTreeIndexOperationsHelper.buildDropJobSpecImpl(dataset, sampleIdx, options, metadataProvider,
                 sourceLoc);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index dce8638..f712bdd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -50,20 +50,21 @@
  */
 public final class DatasetStreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final String operatorName;
     private final IIndexDataflowHelperFactory[] indexes;
     private final String[] indexesNames;
+    private final int[][] partitionsMap;
 
     public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
-            String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames) {
+            String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames, int[][] partitionsMap) {
         super(spec, 1, 1);
-        //TODO(partitioning)
         outRecDescs[0] = rDesc;
         this.operatorName = operatorName;
         this.indexes = indexes;
         this.indexesNames = indexesNames;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -75,7 +76,7 @@
             private FrameTupleAccessor fta;
             private long totalTupleCount;
             private long totalTupleLength;
-            private Map<String, IndexStats> indexStats;
+            private Map<String, IndexStats> indexesStats;
 
             @Override
             public void open() throws HyracksDataException {
@@ -87,27 +88,9 @@
                     coll.add(new OperatorStats(operatorName));
                 }
                 INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
-                indexStats = new HashMap<>();
-                for (int i = 0; i < indexes.length; i++) {
-                    IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition);
-                    try {
-                        idxFlowHelper.open();
-                        ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance();
-                        long numPages = 0;
-                        synchronized (indexInstance.getOperationTracker()) {
-                            for (ILSMDiskComponent component : indexInstance.getDiskComponents()) {
-                                long componentSize = component.getComponentSize();
-                                if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
-                                    componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component)
-                                            .getBloomFilter().getFileReference().getFile().length();
-                                }
-                                numPages += componentSize / indexInstance.getBufferCache().getPageSize();
-                            }
-                        }
-                        indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages));
-                    } finally {
-                        idxFlowHelper.close();
-                    }
+                indexesStats = new HashMap<>();
+                if (indexes.length > 0) {
+                    gatherIndexesStats(serviceCtx, partitionsMap[partition]);
                 }
             }
 
@@ -136,7 +119,7 @@
                 IStatsCollector statsCollector = ctx.getStatsCollector();
                 if (statsCollector != null) {
                     IOperatorStats stats = statsCollector.getOperatorStats(operatorName);
-                    DatasetStreamStats.update(stats, totalTupleCount, totalTupleLength, indexStats);
+                    DatasetStreamStats.update(stats, totalTupleCount, totalTupleLength, indexesStats);
                 }
                 writer.close();
             }
@@ -150,6 +133,34 @@
             public String getDisplayName() {
                 return operatorName;
             }
+
+            private void gatherIndexesStats(INCServiceContext srcCtx, int[] partitions) throws HyracksDataException {
+                for (int p : partitions) {
+                    for (int i = 0; i < indexes.length; i++) {
+                        IIndexDataflowHelper idxFlowHelper = indexes[i].create(srcCtx, p);
+                        try {
+                            idxFlowHelper.open();
+                            ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance();
+                            long numPages = 0;
+                            synchronized (indexInstance.getOperationTracker()) {
+                                for (ILSMDiskComponent component : indexInstance.getDiskComponents()) {
+                                    long componentSize = component.getComponentSize();
+                                    if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
+                                        componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component)
+                                                .getBloomFilter().getFileReference().getFile().length();
+                                    }
+                                    numPages += componentSize / indexInstance.getBufferCache().getPageSize();
+                                }
+                            }
+                            IndexStats indexStats = indexesStats.computeIfAbsent(indexesNames[i],
+                                    idxName -> new IndexStats(idxName, 0));
+                            indexStats.updateNumPages(numPages);
+                        } finally {
+                            idxFlowHelper.close();
+                        }
+                    }
+                }
+            }
         };
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
index 0c471ef..b6760e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
@@ -27,6 +27,9 @@
 import org.apache.hyracks.api.io.IWritable;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 
+/**
+ * Currently, this class represents the stats of an index across all the partitions. The stats are not per partition.
+ */
 public class IndexStats implements IWritable, Serializable {
 
     private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 470f413..700354e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -183,6 +183,13 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 9d51420..d90a212 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -72,10 +72,10 @@
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.test.support.TestStorageManager;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
-import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.common.TreeOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -121,7 +121,7 @@
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -144,7 +144,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
-        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(1);
         ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
                 primaryHashFunFactories, ordersSplits.length);
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad =
@@ -169,7 +169,7 @@
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
@@ -210,7 +210,7 @@
 
         // load secondary index
         int[] fieldPermutation = { 3, 0 };
-        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(1);
         int numPartitions =
                 Arrays.stream(partitionsMap).map(partitions -> partitions.length).mapToInt(Integer::intValue).sum();
         ITuplePartitionerFactory tuplePartitionerFactory2 =
@@ -245,7 +245,7 @@
                 new DelimitedDataTupleParserFactory(inputParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
-        int[][] partitionsMap = TestUtils.getPartitionsMap(ordersSplits.length);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
         ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
                 primaryHashFunFactories, ordersSplits.length);
 
@@ -283,7 +283,7 @@
     protected void destroyPrimaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor primaryDropOp =
-                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtils.getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
         spec.addRoot(primaryDropOp);
         runTest(spec);
@@ -292,7 +292,7 @@
     protected void destroySecondaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor secondaryDropOp =
-                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtils.getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
         spec.addRoot(secondaryDropOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
index 68e4ac5..ae599d4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
@@ -44,9 +44,9 @@
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
-import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.rtree.RTreeSecondaryIndexSearchOperatorTest;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.Test;
 
 public class LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest
@@ -96,7 +96,7 @@
         int[] keyFields = { 0, 1, 2, 3 };
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec,
                 secondaryWithFilterRecDesc, keyFields, true, true, secondaryHelperFactory, false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { createFile(nc1) });
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index f1dbe5f..1634b2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -79,9 +79,9 @@
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.test.support.TestStorageManager;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
-import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -243,7 +243,7 @@
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -280,7 +280,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
-        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(1);
         ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
                 primaryHashFunFactories, ordersSplits.length);
         TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad =
@@ -306,7 +306,7 @@
         IndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
@@ -343,7 +343,7 @@
         // load secondary index
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
         int[] pkFields = { 4 };
-        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(1);
         int numPartitions =
                 Arrays.stream(partitionsMap).map(partitions -> partitions.length).mapToInt(Integer::intValue).sum();
         ITuplePartitionerFactory partitionerFactory =
@@ -391,7 +391,7 @@
                 ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
-        int[][] partitionsMap = TestUtils.getPartitionsMap(ordersSplits.length);
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
         // insert into primary index
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
@@ -428,7 +428,7 @@
     protected void destroyPrimaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor primaryDropOp =
-                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtils.getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
         spec.addRoot(primaryDropOp);
         runTest(spec);
@@ -437,7 +437,7 @@
     protected void destroySecondaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor secondaryDropOp =
-                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtils.getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
         spec.addRoot(secondaryDropOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index f233417..516dff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -43,7 +43,7 @@
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
-import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -89,7 +89,7 @@
         int[] keyFields = { 0, 1, 2, 3 };
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
                 keyFields, true, true, secondaryHelperFactory, false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
         // fifth field from the tuples coming from secondary index
         int[] primaryLowKeyFields = { 4 };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 2748988..b626a01 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -42,7 +42,7 @@
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
-import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -87,7 +87,7 @@
         int[] keyFields = null;
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
                 keyFields, true, true, secondaryHelperFactory, false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { createFile(nc1) });
         IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index da5de77..cde4667 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -43,7 +43,7 @@
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
-import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -87,7 +87,7 @@
         int[] keyFields = { 0, 1, 2, 3 };
         RTreeSearchOperatorDescriptor secondarySearchOp = new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
                 keyFields, true, true, secondaryHelperFactory, false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtils.getPartitionsMap(1));
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, null, TestUtil.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp, NC1_ID);
         // fifth field from the tuples coming from secondary index
         int[] primaryLowKeyFields = { 4 };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ApplicationDeploymentAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ApplicationDeploymentAPIIntegrationTest.java
index e1300a3..54480d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ApplicationDeploymentAPIIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ApplicationDeploymentAPIIntegrationTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.tests.integration;
 
-import static org.apache.hyracks.tests.integration.TestUtil.uri;
+import static org.apache.hyracks.util.TestUtil.uri;
 
 import java.io.IOException;
 import java.io.InputStream;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
index f2f8061..29428ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.tests.integration;
 
-import static org.apache.hyracks.tests.integration.TestUtil.httpGetAsObject;
+import static org.apache.hyracks.util.TestUtil.httpGetAsObject;
 
 import java.io.IOException;
 import java.net.URI;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
index 9d79d80..dbd7174 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.tests.integration;
 
-import static org.apache.hyracks.tests.integration.TestUtil.httpGetAsObject;
+import static org.apache.hyracks.util.TestUtil.httpGetAsObject;
 
 import java.io.File;
 import java.io.IOException;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/StaticResourcesAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/StaticResourcesAPIIntegrationTest.java
index 7258b16..b4d6bd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/StaticResourcesAPIIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/StaticResourcesAPIIntegrationTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.tests.integration;
 
-import static org.apache.hyracks.tests.integration.TestUtil.httpGetAsString;
+import static org.apache.hyracks.util.TestUtil.httpGetAsString;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 8a08074..573656f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -117,5 +117,12 @@
       <artifactId>hyracks-util</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 3fd0cf9..bc111dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
-import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -53,18 +52,6 @@
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
             boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
-            IMissingWriterFactory nonFilterWriterFactory) throws HyracksDataException {
-        this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
-                minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
-                nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, null, -1,
-                false, null, null, DefaultTupleProjectorFactory.INSTANCE, null, null);
-    }
-
-    public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
-            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
-            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
-            boolean retainInput, boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory,
-            ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index a2487d3..b5c8a4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -53,10 +53,12 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.util.TestUtil;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -277,7 +279,6 @@
     /**
      * @return a list of writers to test. these writers can be of the same type but behave differently based on included mocks
      * @throws HyracksDataException
-     * @throws IndexException
      */
     public IFrameWriter[] createWriters() throws HyracksDataException {
         ArrayList<BTreeSearchOperatorNodePushable> writers = new ArrayList<>();
@@ -285,6 +286,7 @@
         IRecordDescriptorProvider[] recordDescProviders = mockRecDescProviders();
         int partition = 0;
         IHyracksTaskContext[] ctxs = mockIHyracksTaskContext();
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ctxs.length);
         int[] keys = { 0 };
         boolean lowKeyInclusive = true;
         boolean highKeyInclusive = true;
@@ -295,7 +297,8 @@
                             recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0),
                                     0),
                             keys, keys, lowKeyInclusive, highKeyInclusive, keys, keys, pair.getLeft(), false, false,
-                            null, pair.getRight(), false, null);
+                            null, pair.getRight(), false, null, null, -1, false, null, null,
+                            DefaultTupleProjectorFactory.INSTANCE, null, partitionsMap);
                     writers.add(writer);
                 }
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 8ac5dfc..2348a1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -190,12 +190,4 @@
             context.updateLoggers();
         }
     }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/TestUtil.java
similarity index 69%
rename from hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
rename to hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/TestUtil.java
index 73d985d..da1c12e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/TestUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.tests.integration;
+package org.apache.hyracks.util;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,34 +38,42 @@
     private static final String HOST = "127.0.0.1";
     private static final int PORT = 16001;
 
-    static URI uri(String path) throws URISyntaxException {
+    public static URI uri(String path) throws URISyntaxException {
         return new URI("http", null, HOST, PORT, path, null, null);
     }
 
-    static InputStream httpGetAsInputStream(URI uri) throws URISyntaxException, IOException {
+    public static InputStream httpGetAsInputStream(URI uri) throws URISyntaxException, IOException {
         HttpClient client = HttpClients.createMinimal();
         HttpResponse response = client.execute(new HttpGet(uri));
         return response.getEntity().getContent();
     }
 
-    static String httpGetAsString(String path) throws URISyntaxException, IOException {
+    public static String httpGetAsString(String path) throws URISyntaxException, IOException {
         return httpGetAsString(uri(path));
     }
 
-    static String httpGetAsString(URI uri) throws URISyntaxException, IOException {
+    public static String httpGetAsString(URI uri) throws URISyntaxException, IOException {
         InputStream resultStream = httpGetAsInputStream(uri);
         return IOUtils.toString(resultStream, Charset.defaultCharset());
     }
 
-    static ObjectNode getResultAsJson(String resultStr) throws IOException {
+    public static ObjectNode getResultAsJson(String resultStr) throws IOException {
         return new ObjectMapper().readValue(resultStr, ObjectNode.class);
     }
 
-    static ObjectNode httpGetAsObject(String path) throws URISyntaxException, IOException {
+    public static ObjectNode httpGetAsObject(String path) throws URISyntaxException, IOException {
         return getResultAsJson(httpGetAsString(path));
     }
 
-    static ObjectNode httpGetAsObject(URI uri) throws URISyntaxException, IOException {
+    public static ObjectNode httpGetAsObject(URI uri) throws URISyntaxException, IOException {
         return getResultAsJson(httpGetAsString(uri));
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }