[ASTERIXDB-3503][EXT] Improve logic of distributing files to partitions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
With this patch the files to scan are distributed among different
partitions such that the total size of files to read is distributed
fairly among different partitions.

Ext-ref: MB-63840
Change-Id: Id2c56b707f6fd5f4cb40f4576dc12ceb2e61a193
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19093
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java
new file mode 100644
index 0000000..9dd2000
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixDeltaRuntimeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AsterixDeltaRuntimeException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    private final HyracksDataException hyracksDataException;
+
+    public AsterixDeltaRuntimeException(HyracksDataException e) {
+        this.hyracksDataException = e;
+    }
+
+    public HyracksDataException getHyracksDataException() {
+        return hyracksDataException;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
index 2c86132..3ca704f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java
@@ -124,7 +124,7 @@
         try {
             actualType = DeltaDataParser.getTypeTag(type, false, context);
         } catch (HyracksDataException e) {
-            throw new RuntimeException(e);
+            throw new AsterixDeltaRuntimeException(e);
         }
         ATypeTag expectedType = node.getTypeTag();
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 5ce2c78..9d93fce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -21,11 +21,13 @@
 import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -58,8 +60,10 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
 
 public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
 
@@ -68,10 +72,9 @@
             Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
     private static final Logger LOGGER = LogManager.getLogger();
     private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
-    private Map<Integer, List<String>> schedule;
     private String scanState;
     private Map<String, String> configuration;
-    private List<String> scanFiles;
+    protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -120,28 +123,30 @@
             requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
         } catch (IOException e) {
             throw new RuntimeException(e);
+        } catch (AsterixDeltaRuntimeException e) {
+            throw e.getHyracksDataException();
         }
         Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
 
-        scanFiles = new ArrayList<>();
+        List<Row> scanFiles = new ArrayList<>();
         while (iter.hasNext()) {
             FilteredColumnarBatch batch = iter.next();
             CloseableIterator<Row> rowIter = batch.getRows();
             while (rowIter.hasNext()) {
                 Row row = rowIter.next();
-                scanFiles.add(RowSerDe.serializeRowToJson(row));
+                scanFiles.add(row);
             }
         }
-        locationConstraints = configureLocationConstraints(appCtx);
+        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
         configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles();
+        distributeFiles(scanFiles);
         issueWarnings(warnings, warningCollector);
     }
 
     private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
-        if (!warnings.isEmpty() && warningCollector.shouldWarn()) {
+        if (!warnings.isEmpty()) {
             for (Warning warning : warnings) {
                 if (warningCollector.shouldWarn()) {
                     warningCollector.warn(warning);
@@ -151,7 +156,8 @@
         warnings.clear();
     }
 
-    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
+    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
+            List<Row> scanFiles) {
         IClusterStateManager csm = appCtx.getClusterStateManager();
 
         String[] locations = csm.getClusterLocations().getLocations();
@@ -168,24 +174,30 @@
         return new AlgebricksAbsolutePartitionConstraint(locations);
     }
 
-    private void distributeFiles() {
-        final int numComputePartitions = getPartitionConstraint().getLocations().length;
-        schedule = new HashMap<>();
-        for (int i = 0; i < numComputePartitions; i++) {
-            schedule.put(i, new ArrayList<>());
+    private void distributeFiles(List<Row> scanFiles) {
+        final int partitionsCount = getPartitionConstraint().getLocations().length;
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
         }
-        int i = 0;
-        for (String scanFile : scanFiles) {
-            schedule.get(i).add(scanFile);
-            i = (i + 1) % numComputePartitions;
+        for (Row scanFileRow : scanFiles) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
+            workloadQueue.add(workload);
         }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 
     @Override
     public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
         try {
             int partition = context.getPartition();
-            return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context);
+            return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
+                    configuration, context);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -206,4 +218,31 @@
         return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
     }
 
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final List<String> scanFiles = new ArrayList<>();
+        private long totalSize = 0;
+
+        public PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<String> getScanFiles() {
+            return scanFiles;
+        }
+
+        public void addScanFile(String scanFile, long size) {
+            this.scanFiles.add(scanFile);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+        }
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 50e72ed..b311870 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -921,7 +921,7 @@
     }
 
     public static boolean supportsPushdown(Map<String, String> properties) {
-        //Currently, only Apache Parquet format is supported
+        //Currently, only Apache Parquet/Delta table format is supported
         return isParquetFormat(properties) || isDeltaTable(properties);
     }