[NO ISSUE][EXT]: Improve workload distribution logic
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Improve the workload distribution logic.
Change-Id: I191dc87850c1812b49831dd3c78bc7a22cc5b931
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10763
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index 0b215a0..2a063bf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -82,27 +82,6 @@
((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
}
- /**
- * Finds the smallest workload and returns it
- *
- * @return the smallest workload
- */
- protected PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
- PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
- for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
- // If the current total size is 0, add the file directly as this is a first time partition
- if (partition.getTotalSize() == 0) {
- smallest = partition;
- break;
- }
- if (partition.getTotalSize() < smallest.getTotalSize()) {
- smallest = partition;
- }
- }
-
- return smallest;
- }
-
protected IncludeExcludeMatcher getIncludeExcludeMatchers() throws CompilationException {
// Get and compile the patterns for include/exclude if provided
List<Matcher> includeMatchers = new ArrayList<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index a1c577a..747fcca 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -19,8 +19,10 @@
package org.apache.asterix.external.input.record.reader.aws;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
@@ -216,14 +218,19 @@
* @param partitionsCount Partitions count
*/
private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) {
+ PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+ Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
// Prepare the workloads based on the number of partitions
for (int i = 0; i < partitionsCount; i++) {
- partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+ workloadQueue.add(new PartitionWorkLoadBasedOnSize());
}
for (S3Object object : fileObjects) {
- PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
- smallest.addFilePath(object.key(), object.size());
+ PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+ workload.addFilePath(object.key(), object.size());
+ workloadQueue.add(workload);
}
+ partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
index ca064b1..803694a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -19,8 +19,10 @@
package org.apache.asterix.external.input.record.reader.azure;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
@@ -131,14 +133,19 @@
* @param partitionsCount Partitions count
*/
private void distributeWorkLoad(List<BlobItem> items, int partitionsCount) {
+ PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+ Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
// Prepare the workloads based on the number of partitions
for (int i = 0; i < partitionsCount; i++) {
- partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+ workloadQueue.add(new PartitionWorkLoadBasedOnSize());
}
- for (BlobItem item : items) {
- PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
- smallest.addFilePath(item.getName(), item.getProperties().getContentLength());
+ for (BlobItem object : items) {
+ PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+ workload.addFilePath(object.getName(), object.getProperties().getContentLength());
+ workloadQueue.add(workload);
}
+ partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
}
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
new file mode 100644
index 0000000..9cdeb16
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.awss3;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3Test {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWorkloadDistribution() throws Exception {
+ AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory();
+
+ List<S3Object> s3Objects = new ArrayList<>();
+ final int partitionsCount = 3;
+
+ // Create S3 objects, 9 objects, on 3 partitions, they should be 600 total size on each partition
+ S3Object.Builder builder = S3Object.builder();
+ s3Objects.add(builder.key("1.json").size(100L).build());
+ s3Objects.add(builder.key("2.json").size(100L).build());
+ s3Objects.add(builder.key("3.json").size(100L).build());
+ s3Objects.add(builder.key("4.json").size(200L).build());
+ s3Objects.add(builder.key("5.json").size(200L).build());
+ s3Objects.add(builder.key("6.json").size(200L).build());
+ s3Objects.add(builder.key("7.json").size(300L).build());
+ s3Objects.add(builder.key("8.json").size(300L).build());
+ s3Objects.add(builder.key("9.json").size(300L).build());
+
+ // invoke the distributeWorkLoad method
+ Method distributeWorkloadMethod =
+ AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", List.class, int.class);
+ distributeWorkloadMethod.setAccessible(true);
+ distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount);
+
+ // get the partitionWorkLoadsBasedOnSize field and verify the result
+ Field distributeWorkloadField =
+ AwsS3InputStreamFactory.class.getSuperclass().getDeclaredField("partitionWorkLoadsBasedOnSize");
+ distributeWorkloadField.setAccessible(true);
+ List<AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize> workloads =
+ (List<AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize>) distributeWorkloadField
+ .get(factory);
+
+ for (AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize workload : workloads) {
+ Assert.assertEquals(workload.getTotalSize(), 600);
+ }
+ }
+}