[ASTERIXDB-3144][RT] Make hash exchanges consider partitions map
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Some operators like INSERT,UPSERT, etc. will require their input
to hash partition the data based on a partitions map. This patch
is to make the hash exchanges satisfy this requirement.
Hash exchanges will take an optional partitions map to use when
hash partitioning.
- Make sure the partitions map is considered when comparing
partitioning properties.
Change-Id: I71457603048e9be9467943918e21ce5ede658c19
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17489
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
index 059d52a..17bf600 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
@@ -75,6 +75,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
index 31a959f..186ef1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -23,10 +23,13 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
public class FieldHashPartitionComputer extends HashPartitioner implements ITuplePartitionComputer {
- public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
- super(hashFields, hashFunctions);
+ public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions,
+ Int2IntMap storagePartition2Compute) {
+ super(hashFields, hashFunctions, storagePartition2Compute);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 52df3b7..c91a0ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -24,14 +24,27 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class FieldHashPartitionComputerFactory implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
+
+ private static final long serialVersionUID = 2L;
private final int[] hashFields;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final int[][] partitionsMap;
public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories) {
this.hashFields = hashFields;
this.hashFunctionFactories = hashFunctionFactories;
+ this.partitionsMap = null;
+ }
+
+ public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ int[][] partitionsMap) {
+ this.hashFields = hashFields;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -40,6 +53,16 @@
for (int i = 0; i < hashFunctionFactories.length; ++i) {
hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
}
- return new FieldHashPartitionComputer(hashFields, hashFunctions);
+ if (partitionsMap == null) {
+ return new FieldHashPartitionComputer(hashFields, hashFunctions, null);
+ } else {
+ Int2IntMap storagePartition2Compute = new Int2IntOpenHashMap();
+ for (int i = 0; i < partitionsMap.length; i++) {
+ for (int storagePartition : partitionsMap[i]) {
+ storagePartition2Compute.put(storagePartition, i);
+ }
+ }
+ return new FieldHashPartitionComputer(hashFields, hashFunctions, storagePartition2Compute);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
index 5620a95..e36315c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -28,7 +28,7 @@
private final int numPartitions;
public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, int numPartitions) {
- super(hashFields, hashFunctions);
+ super(hashFields, hashFunctions, null);
this.numPartitions = numPartitions;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
index b09bcb8..cb97d1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -22,14 +22,18 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
class HashPartitioner {
private final int[] hashFields;
private final IBinaryHashFunction[] hashFunctions;
+ private final Int2IntMap storagePartition2Compute;
- public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+ public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, Int2IntMap storagePartition2Compute) {
this.hashFields = hashFields;
this.hashFunctions = hashFunctions;
+ this.storagePartition2Compute = storagePartition2Compute;
}
protected int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
@@ -50,6 +54,15 @@
if (h < 0) {
h = -(h + 1);
}
- return h % nParts;
+ if (storagePartition2Compute == null) {
+ return h % nParts;
+ } else {
+ int storagePartition = h % storagePartition2Compute.size();
+ int computePartition = storagePartition2Compute.getOrDefault(storagePartition, Integer.MIN_VALUE);
+ if (computePartition < 0 || computePartition >= nParts) {
+ throw new IllegalStateException("couldn't resolve storage partition to compute partition");
+ }
+ return computePartition;
+ }
}
}