[ASTERIXDB-3144][RT] Make hash exchanges consider partitions map

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Some operators like INSERT,UPSERT, etc. will require their input
to hash partition the data based on a partitions map. This patch
is to make the hash exchanges satisfy this requirement.
Hash exchanges will take an optional partitions map to use when
hash partitioning.

- Make sure the partitions map is considered when comparing
  partitioning properties.

Change-Id: I71457603048e9be9467943918e21ce5ede658c19
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17489
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
index 059d52a..17bf600 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
@@ -75,6 +75,10 @@
       <artifactId>commons-io</artifactId>
     </dependency>
     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
index 31a959f..186ef1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -23,10 +23,13 @@
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
 public class FieldHashPartitionComputer extends HashPartitioner implements ITuplePartitionComputer {
 
-    public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
-        super(hashFields, hashFunctions);
+    public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions,
+            Int2IntMap storagePartition2Compute) {
+        super(hashFields, hashFunctions, storagePartition2Compute);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 52df3b7..c91a0ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -24,14 +24,27 @@
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class FieldHashPartitionComputerFactory implements ITuplePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
+
+    private static final long serialVersionUID = 2L;
     private final int[] hashFields;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final int[][] partitionsMap;
 
     public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories) {
         this.hashFields = hashFields;
         this.hashFunctionFactories = hashFunctionFactories;
+        this.partitionsMap = null;
+    }
+
+    public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            int[][] partitionsMap) {
+        this.hashFields = hashFields;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -40,6 +53,16 @@
         for (int i = 0; i < hashFunctionFactories.length; ++i) {
             hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
         }
-        return new FieldHashPartitionComputer(hashFields, hashFunctions);
+        if (partitionsMap == null) {
+            return new FieldHashPartitionComputer(hashFields, hashFunctions, null);
+        } else {
+            Int2IntMap storagePartition2Compute = new Int2IntOpenHashMap();
+            for (int i = 0; i < partitionsMap.length; i++) {
+                for (int storagePartition : partitionsMap[i]) {
+                    storagePartition2Compute.put(storagePartition, i);
+                }
+            }
+            return new FieldHashPartitionComputer(hashFields, hashFunctions, storagePartition2Compute);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
index 5620a95..e36315c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -28,7 +28,7 @@
     private final int numPartitions;
 
     public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, int numPartitions) {
-        super(hashFields, hashFunctions);
+        super(hashFields, hashFunctions, null);
         this.numPartitions = numPartitions;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
index b09bcb8..cb97d1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -22,14 +22,18 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
 class HashPartitioner {
 
     private final int[] hashFields;
     private final IBinaryHashFunction[] hashFunctions;
+    private final Int2IntMap storagePartition2Compute;
 
-    public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+    public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, Int2IntMap storagePartition2Compute) {
         this.hashFields = hashFields;
         this.hashFunctions = hashFunctions;
+        this.storagePartition2Compute = storagePartition2Compute;
     }
 
     protected int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
@@ -50,6 +54,15 @@
         if (h < 0) {
             h = -(h + 1);
         }
-        return h % nParts;
+        if (storagePartition2Compute == null) {
+            return h % nParts;
+        } else {
+            int storagePartition = h % storagePartition2Compute.size();
+            int computePartition = storagePartition2Compute.getOrDefault(storagePartition, Integer.MIN_VALUE);
+            if (computePartition < 0 || computePartition >= nParts) {
+                throw new IllegalStateException("couldn't resolve storage partition to compute partition");
+            }
+            return computePartition;
+        }
     }
 }