[ASTERIXDB-3144][HYR][RT] Make dump_index() function support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch changes the dump_index() function to support
operating on multiple partitions.

Change-Id: I8754069a7340c0d9e3bf69e1fe5c94eb333b73b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17513
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index 691be47..bd72a66 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -39,27 +39,30 @@
     private final IndexDataflowHelperFactory indexDataflowHelperFactory;
     private final RecordDescriptor recDesc;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final AlgebricksAbsolutePartitionConstraint storageLocations;
+    private final AlgebricksAbsolutePartitionConstraint constraint;
+    private final int[][] partitionsMap;
 
     public DumpIndexDatasource(INodeDomain domain, IndexDataflowHelperFactory indexDataflowHelperFactory,
             RecordDescriptor recDesc, IBinaryComparatorFactory[] comparatorFactories,
-            AlgebricksAbsolutePartitionConstraint storageLocations) throws AlgebricksException {
+            AlgebricksAbsolutePartitionConstraint constraint, int[][] partitionsMap) throws AlgebricksException {
         super(DUMP_INDEX_DATASOURCE_ID, DumpIndexRewriter.DUMP_INDEX, domain);
         this.indexDataflowHelperFactory = indexDataflowHelperFactory;
         this.recDesc = recDesc;
         this.comparatorFactories = comparatorFactories;
-        this.storageLocations = storageLocations;
+        this.constraint = constraint;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
-        return storageLocations;
+        return constraint;
     }
 
     @Override
     protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
             AlgebricksAbsolutePartitionConstraint locations) {
-        return new DumpIndexFunction(locations, indexDataflowHelperFactory, recDesc, comparatorFactories);
+        return new DumpIndexFunction(locations, indexDataflowHelperFactory, recDesc, comparatorFactories,
+                partitionsMap);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
index 2fdbef3..fcfe3e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
@@ -31,25 +31,31 @@
 
 public class DumpIndexFunction extends AbstractDatasourceFunction {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IndexDataflowHelperFactory indexDataflowHelperFactory;
     private final RecordDescriptor recDesc;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final int[][] partitionsMap;
 
     public DumpIndexFunction(AlgebricksAbsolutePartitionConstraint locations,
             IndexDataflowHelperFactory indexDataflowHelperFactory, RecordDescriptor recDesc,
-            IBinaryComparatorFactory[] comparatorFactories) {
+            IBinaryComparatorFactory[] comparatorFactories, int[][] partitionsMap) {
         super(locations);
         this.indexDataflowHelperFactory = indexDataflowHelperFactory;
         this.recDesc = recDesc;
         this.comparatorFactories = comparatorFactories;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
-        final IIndexDataflowHelper indexDataflowHelper = indexDataflowHelperFactory.create(serviceCtx, partition);
-        return new DumpIndexReader(indexDataflowHelper, recDesc, comparatorFactories);
+        int[] partitions = partitionsMap[partition];
+        final IIndexDataflowHelper[] indexDataflowHelpers = new IIndexDataflowHelper[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            indexDataflowHelpers[i] = indexDataflowHelperFactory.create(serviceCtx, partitions[i]);
+        }
+        return new DumpIndexReader(indexDataflowHelpers, recDesc, comparatorFactories);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 8ef094e..5c5a218 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -34,12 +34,13 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.MultiComparator;
@@ -48,38 +49,50 @@
 public class DumpIndexReader extends FunctionReader {
 
     private final CharArrayRecord record;
-    private final IIndexCursor searchCursor;
+    private final IIndexCursor[] searchCursors;
     private final RecordDescriptor secondaryRecDesc;
     private final StringBuilder recordBuilder = new StringBuilder();
     private final ByteBufferInputStream bbis = new ByteBufferInputStream();
     private final DataInputStream dis = new DataInputStream(bbis);
-    private final IIndexDataflowHelper indexDataflowHelper;
-    private final IIndexAccessor accessor;
+    private final IIndexDataflowHelper[] indexDataflowHelpers;
+    private final IIndexAccessor[] accessors;
+    private int currentSearchIdx;
 
-    public DumpIndexReader(IIndexDataflowHelper indexDataflowHelper, RecordDescriptor secondaryRecDesc,
+    public DumpIndexReader(IIndexDataflowHelper[] indexDataflowHelpers, RecordDescriptor secondaryRecDesc,
             IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException {
-        this.indexDataflowHelper = indexDataflowHelper;
+        this.indexDataflowHelpers = indexDataflowHelpers;
         this.secondaryRecDesc = secondaryRecDesc;
-        indexDataflowHelper.open();
-        IIndex indexInstance = indexDataflowHelper.getIndexInstance();
-        accessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        searchCursor = accessor.createSearchCursor(false);
         MultiComparator searchMultiComparator = MultiComparator.create(comparatorFactories);
         RangePredicate rangePredicate =
                 new RangePredicate(null, null, true, true, searchMultiComparator, searchMultiComparator, null, null);
-        accessor.search(searchCursor, rangePredicate);
+        this.accessors = new IIndexAccessor[indexDataflowHelpers.length];
+        this.searchCursors = new IIndexCursor[indexDataflowHelpers.length];
+        for (int i = 0; i < indexDataflowHelpers.length; i++) {
+            IIndexDataflowHelper indexDataflowHelper = indexDataflowHelpers[i];
+            indexDataflowHelper.open();
+            accessors[i] = indexDataflowHelper.getIndexInstance().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            searchCursors[i] = accessors[i].createSearchCursor(false);
+            accessors[i].search(searchCursors[i], rangePredicate);
+        }
+        currentSearchIdx = 0;
         record = new CharArrayRecord();
     }
 
     @Override
     public boolean hasNext() throws Exception {
-        return searchCursor.hasNext();
+        while (currentSearchIdx < searchCursors.length) {
+            if (searchCursors[currentSearchIdx].hasNext()) {
+                return true;
+            }
+            currentSearchIdx++;
+        }
+        return false;
     }
 
     @Override
     public IRawRecord<char[]> next() throws IOException, InterruptedException {
-        searchCursor.next();
-        ITupleReference tuple = searchCursor.getTuple();
+        searchCursors[currentSearchIdx].next();
+        ITupleReference tuple = searchCursors[currentSearchIdx].getTuple();
         buildJsonRecord(tuple);
         record.reset();
         record.append(recordBuilder.toString().toCharArray());
@@ -89,16 +102,21 @@
 
     @Override
     public void close() throws IOException {
-        bbis.close();
-        dis.close();
-        if (searchCursor != null) {
-            searchCursor.close();
-            searchCursor.destroy();
+        Throwable failure = releaseResources();
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
         }
-        if (accessor != null) {
-            accessor.destroy();
+    }
+
+    private Throwable releaseResources() {
+        Throwable failure = CleanupUtils.close(bbis, null);
+        failure = CleanupUtils.close(dis, failure);
+        for (int i = 0; i < indexDataflowHelpers.length; i++) {
+            failure = ResourceReleaseUtils.close(searchCursors[i], failure);
+            failure = CleanupUtils.destroy(failure, searchCursors[i], accessors[i]);
+            failure = ResourceReleaseUtils.close(indexDataflowHelpers[i], failure);
         }
-        indexDataflowHelper.close();
+        return failure;
     }
 
     private void buildJsonRecord(ITupleReference tuple) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
index 6c0382d..dac1ac7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.function;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.functions.FunctionConstants;
@@ -66,13 +67,15 @@
         }
         ISecondaryIndexOperationsHelper secondaryIndexHelper =
                 SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, loc);
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
         IndexDataflowHelperFactory indexDataflowHelperFactory =
                 new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        secondaryIndexHelper.getSecondaryFileSplitProvider());
+                        partitioningProperties.getSpiltsProvider());
         AlgebricksAbsolutePartitionConstraint secondaryPartitionConstraint =
-                (AlgebricksAbsolutePartitionConstraint) secondaryIndexHelper.getSecondaryPartitionConstraint();
+                (AlgebricksAbsolutePartitionConstraint) partitioningProperties.getConstraints();
         return new DumpIndexDatasource(context.getComputationNodeDomain(), indexDataflowHelperFactory,
                 secondaryIndexHelper.getSecondaryRecDesc(), secondaryIndexHelper.getSecondaryComparatorFactories(),
-                secondaryPartitionConstraint);
+                secondaryPartitionConstraint, partitioningProperties.getComputeStorageMap());
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 1b4fd23..64dce1e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -63,7 +63,7 @@
             IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
             int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) throws HyracksDataException {
         super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys, numPrimaryKeys, hasBuddyBTree);
-
+        //TODO(partitioning) correlated
         this.primaryIndexHelper =
                 primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.secondaryIndexHelper =