[NO ISSUE][EXT]: Improve workload distribution logic

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Improve the workload distribution logic.

Change-Id: I191dc87850c1812b49831dd3c78bc7a22cc5b931
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10763
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index 0b215a0..2a063bf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -82,27 +82,6 @@
                 ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
     }
 
-    /**
-     * Finds the smallest workload and returns it
-     *
-     * @return the smallest workload
-     */
-    protected PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
-        PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
-        for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
-            // If the current total size is 0, add the file directly as this is a first time partition
-            if (partition.getTotalSize() == 0) {
-                smallest = partition;
-                break;
-            }
-            if (partition.getTotalSize() < smallest.getTotalSize()) {
-                smallest = partition;
-            }
-        }
-
-        return smallest;
-    }
-
     protected IncludeExcludeMatcher getIncludeExcludeMatchers() throws CompilationException {
         // Get and compile the patterns for include/exclude if provided
         List<Matcher> includeMatchers = new ArrayList<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index a1c577a..747fcca 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -19,8 +19,10 @@
 package org.apache.asterix.external.input.record.reader.aws;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
@@ -216,14 +218,19 @@
      * @param partitionsCount Partitions count
      */
     private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) {
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
         // Prepare the workloads based on the number of partitions
         for (int i = 0; i < partitionsCount; i++) {
-            partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
         }
 
         for (S3Object object : fileObjects) {
-            PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
-            smallest.addFilePath(object.key(), object.size());
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            workload.addFilePath(object.key(), object.size());
+            workloadQueue.add(workload);
         }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
index ca064b1..803694a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -19,8 +19,10 @@
 package org.apache.asterix.external.input.record.reader.azure;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
@@ -131,14 +133,19 @@
      * @param partitionsCount Partitions count
      */
     private void distributeWorkLoad(List<BlobItem> items, int partitionsCount) {
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
         // Prepare the workloads based on the number of partitions
         for (int i = 0; i < partitionsCount; i++) {
-            partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
         }
 
-        for (BlobItem item : items) {
-            PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
-            smallest.addFilePath(item.getName(), item.getProperties().getContentLength());
+        for (BlobItem object : items) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            workload.addFilePath(object.getName(), object.getProperties().getContentLength());
+            workloadQueue.add(workload);
         }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
new file mode 100644
index 0000000..9cdeb16
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.awss3;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3Test {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testWorkloadDistribution() throws Exception {
+        AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory();
+
+        List<S3Object> s3Objects = new ArrayList<>();
+        final int partitionsCount = 3;
+
+        // Create S3 objects, 9 objects, on 3 partitions, they should be 600 total size on each partition
+        S3Object.Builder builder = S3Object.builder();
+        s3Objects.add(builder.key("1.json").size(100L).build());
+        s3Objects.add(builder.key("2.json").size(100L).build());
+        s3Objects.add(builder.key("3.json").size(100L).build());
+        s3Objects.add(builder.key("4.json").size(200L).build());
+        s3Objects.add(builder.key("5.json").size(200L).build());
+        s3Objects.add(builder.key("6.json").size(200L).build());
+        s3Objects.add(builder.key("7.json").size(300L).build());
+        s3Objects.add(builder.key("8.json").size(300L).build());
+        s3Objects.add(builder.key("9.json").size(300L).build());
+
+        // invoke the distributeWorkLoad method
+        Method distributeWorkloadMethod =
+                AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", List.class, int.class);
+        distributeWorkloadMethod.setAccessible(true);
+        distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount);
+
+        // get the partitionWorkLoadsBasedOnSize field and verify the result
+        Field distributeWorkloadField =
+                AwsS3InputStreamFactory.class.getSuperclass().getDeclaredField("partitionWorkLoadsBasedOnSize");
+        distributeWorkloadField.setAccessible(true);
+        List<AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize> workloads =
+                (List<AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize>) distributeWorkloadField
+                        .get(factory);
+
+        for (AbstractExternalInputStreamFactory.PartitionWorkLoadBasedOnSize workload : workloads) {
+            Assert.assertEquals(workload.getTotalSize(), 600);
+        }
+    }
+}