[ASTERIXDB-3547][EXT]: Use IDataPartitioningProvider for Location Constraints Configuration in Delta

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-64800

Change-Id: I9ba2ce7afc54ddd08f5e522627c56e44f51cdbbc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19293
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c50b391..7e7b6f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -440,6 +440,11 @@
         loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/delta_all_type/_delta_log", JSON_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/delta_all_type", PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine", PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/");
+
     }
 
     private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 1236636..67d460c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -55,6 +55,10 @@
             "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "modified_delta_table";
     public static final String DELTA_MULTI_FILE_TABLE =
             "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "multiple_file_delta_table";
+    public static final String DELTA_FILE_SIZE_ONE =
+            "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_one";
+    public static final String DELTA_FILE_SIZE_NINE =
+            "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine";
 
     public static void prepareDeltaTableContainer(Configuration conf) {
         File basePath = new File(".");
@@ -62,6 +66,8 @@
         prepareMultipleFilesTable(conf);
         prepareModifiedTable(conf);
         prepareEmptyTable(conf);
+        prepareFileSizeOne(conf);
+        prepareFileSizeNine(conf);
     }
 
     public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) {
@@ -221,4 +227,129 @@
             throw new RuntimeException(e);
         }
     }
+
+    public static void prepareFileSizeOne(Configuration conf) {
+        Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord();
+        try {
+            Path path = new Path(DELTA_FILE_SIZE_ONE, "firstFile.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+
+            List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema));
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+
+            long size = writer.getDataSize();
+            writer.close();
+
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size,
+                    System.currentTimeMillis(), true, null, null));
+            DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_ONE);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+                    .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+                            .add(new StructField("name", new StringType(), true)))
+                    .build();
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void prepareFileSizeNine(Configuration conf) {
+        Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord();
+        try {
+            Path path = new Path(DELTA_FILE_SIZE_NINE, "firstFile.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+
+            List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileSecondSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileThirdSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileFourthSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileFifthSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileSixthSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileSeventhSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileEightSnapshotRecords = List.of(new GenericData.Record(schema));
+            List<GenericData.Record> fileNineSnapshotRecords = List.of(new GenericData.Record(schema));
+
+            List<List<GenericData.Record>> allSnapshotRecords =
+                    List.of(fileFirstSnapshotRecords, fileSecondSnapshotRecords, fileThirdSnapshotRecords,
+                            fileFourthSnapshotRecords, fileFifthSnapshotRecords, fileSixthSnapshotRecords,
+                            fileSeventhSnapshotRecords, fileEightSnapshotRecords, fileNineSnapshotRecords);
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+
+            fileSecondSnapshotRecords.get(0).put("id", 1);
+            fileSecondSnapshotRecords.get(0).put("name", "Adam");
+
+            fileThirdSnapshotRecords.get(0).put("id", 2);
+            fileThirdSnapshotRecords.get(0).put("name", "Third");
+
+            fileFourthSnapshotRecords.get(0).put("id", 3);
+            fileFourthSnapshotRecords.get(0).put("name", "Fourth");
+
+            fileFifthSnapshotRecords.get(0).put("id", 4);
+            fileFifthSnapshotRecords.get(0).put("name", "Five");
+
+            fileSixthSnapshotRecords.get(0).put("id", 5);
+            fileSixthSnapshotRecords.get(0).put("name", "Six");
+
+            fileSeventhSnapshotRecords.get(0).put("id", 6);
+            fileSeventhSnapshotRecords.get(0).put("name", "Seven");
+
+            fileEightSnapshotRecords.get(0).put("id", 7);
+            fileEightSnapshotRecords.get(0).put("name", "Eight");
+
+            fileNineSnapshotRecords.get(0).put("id", 8);
+            fileNineSnapshotRecords.get(0).put("name", "Nine");
+
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+
+            long size = writer.getDataSize();
+            writer.close();
+
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size,
+                    System.currentTimeMillis(), true, null, null));
+            DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_NINE);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+                    .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+                            .add(new StructField("name", new StringType(), true)))
+                    .build();
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+            for (int i = 2; i <= 9; i++) {
+                Path path2 = new Path(DELTA_FILE_SIZE_NINE, "File" + i + ".parquet");
+                ParquetWriter<GenericData.Record> writer2 =
+                        AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build();
+
+                for (GenericData.Record record : allSnapshotRecords.get(i - 1)) {
+                    writer2.write(record);
+                }
+
+                long size2 = writer2.getDataSize();
+                writer2.close();
+
+                List<Action> actions2 = List.of(new AddFile("File" + i + ".parquet", new HashMap<>(), size2,
+                        System.currentTimeMillis(), true, null, null));
+
+                OptimisticTransaction txn2 = log.startTransaction();
+                txn2.commit(actions2, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
new file mode 100644
index 0000000..c1a74c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/delta_file_size_nine"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
new file mode 100644
index 0000000..db2abf5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
new file mode 100644
index 0000000..1284e93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/delta_file_size_one"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
new file mode 100644
index 0000000..84e7914
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
new file mode 100644
index 0000000..500f6a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Cooper" }
+{ "id": 1, "name": "Adam" }
+{ "id": 2, "name": "Third" }
+{ "id": 3, "name": "Fourth" }
+{ "id": 4, "name": "Five" }
+{ "id": 5, "name": "Six" }
+{ "id": 6, "name": "Seven" }
+{ "id": 7, "name": "Eight" }
+{ "id": 8, "name": "Nine" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
new file mode 100644
index 0000000..006681c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
@@ -0,0 +1 @@
+{ "id": 0, "name": "Cooper" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ff1b325..c31cb5e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -556,6 +556,18 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-file-one">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-file-one</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-file-nine">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-file-nine</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
         <placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index dc4c310..790db8c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -30,7 +30,6 @@
 import java.util.PriorityQueue;
 import java.util.Set;
 
-import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -44,7 +43,6 @@
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -77,6 +75,10 @@
     protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
 
+    public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() {
+        return partitionWorkLoadsBasedOnSize;
+    }
+
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         return locationConstraints;
@@ -134,9 +136,9 @@
                 scanFiles.add(row);
             }
         }
-        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
+        locationConstraints = getPartitions(appCtx);
         configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles(scanFiles);
+        distributeFiles(scanFiles, getPartitionConstraint().getLocations().length);
         issueWarnings(warnings, warningCollector);
     }
 
@@ -151,26 +153,11 @@
         warnings.clear();
     }
 
-    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
-            List<Row> scanFiles) {
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-
-        String[] locations = csm.getClusterLocations().getLocations();
-        if (scanFiles.size() == 0) {
-            return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
-        } else if (locations.length > scanFiles.size()) {
-            LOGGER.debug(
-                    "configured partitions ({}) exceeds total partition count ({}); limiting configured partitions to total partition count",
-                    locations.length, scanFiles.size());
-            final String[] locationCopy = locations.clone();
-            ArrayUtils.shuffle(locationCopy);
-            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations);
+    public AlgebricksAbsolutePartitionConstraint getPartitions(ICcApplicationContext appCtx) {
+        return appCtx.getDataPartitioningProvider().getClusterLocations();
     }
 
-    private void distributeFiles(List<Row> scanFiles) {
-        final int partitionsCount = getPartitionConstraint().getLocations().length;
+    public void distributeFiles(List<Row> scanFiles, int partitionsCount) {
         PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
                 Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
 
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
new file mode 100644
index 0000000..1bc8eb8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.awss3;
+
+import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_ORDINAL;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+public class DeltaTopicPartitionDistributionTest {
+
+    @Test
+    public void distributeFilesMoreFilesThanPartitions() {
+        int rowCount = 25;
+        int numberOfPartition = 13;
+        List<Row> scanFiles = createMockRows(rowCount);
+        DeltaReaderFactory d = new DeltaReaderFactory() {
+            @Override
+            protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+                    throws AlgebricksException {
+
+            }
+
+            @Override
+            protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+                return null;
+            }
+
+            @Override
+            public List<String> getRecordReaderNames() {
+                return null;
+            }
+        };
+        d.distributeFiles(scanFiles, numberOfPartition);
+        Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size());
+        verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize());
+    }
+
+    @Test
+    public void distributeFilesLessFilesThanPartitions() {
+        int rowCount = 15;
+        int numberOfPartition = 23;
+        List<Row> scanFiles = createMockRows(rowCount);
+        DeltaReaderFactory d = new DeltaReaderFactory() {
+            @Override
+            protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+                    throws AlgebricksException {
+
+            }
+
+            @Override
+            protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+                return null;
+            }
+
+            @Override
+            public List<String> getRecordReaderNames() {
+                return null;
+            }
+        };
+        d.distributeFiles(scanFiles, numberOfPartition);
+        Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size());
+        verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize());
+    }
+
+    @Test
+    public void distributeFilesEqualFilesAndPartitions() {
+        int rowCount = 9;
+        int numberOfPartition = 9;
+        List<Row> scanFiles = createMockRows(rowCount);
+        DeltaReaderFactory d = new DeltaReaderFactory() {
+            @Override
+            protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+                    throws AlgebricksException {
+
+            }
+
+            @Override
+            protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+                return null;
+            }
+
+            @Override
+            public List<String> getRecordReaderNames() {
+                return null;
+            }
+        };
+        d.distributeFiles(scanFiles, numberOfPartition);
+        Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size());
+        verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize());
+    }
+
+    private void verifyFileDistribution(int numberOfFiles,
+            List<DeltaReaderFactory.PartitionWorkLoadBasedOnSize> workloads) {
+        int totalDistributedFiles = 0;
+
+        for (DeltaReaderFactory.PartitionWorkLoadBasedOnSize workload : workloads) {
+            totalDistributedFiles += workload.getScanFiles().size();
+            Assert.assertTrue(workload.getTotalSize() >= 0);
+        }
+        Assert.assertEquals(numberOfFiles, totalDistributedFiles);
+    }
+
+    private List<Row> createMockRows(int count) {
+        List<Row> rows = new ArrayList<>();
+        StructType sch = createMockSchema();
+
+        for (int i = 1; i <= count; i++) {
+            int finalI = i;
+            Row row = new Row() {
+
+                @Override
+                public StructType getSchema() {
+                    return sch;
+                }
+
+                @Override
+                public boolean isNullAt(int i) {
+                    return false;
+                }
+
+                @Override
+                public boolean getBoolean(int i) {
+                    return false;
+                }
+
+                @Override
+                public byte getByte(int i) {
+                    return 0;
+                }
+
+                @Override
+                public short getShort(int i) {
+                    return 0;
+                }
+
+                @Override
+                public int getInt(int i) {
+                    if (i == 1) {
+                        return finalI;
+                    } else if (i == 2) {
+                        return finalI * 10;
+                    }
+                    return 0;
+                }
+
+                @Override
+                public long getLong(int i) {
+                    return 0;
+                }
+
+                @Override
+                public float getFloat(int i) {
+                    return 0;
+                }
+
+                @Override
+                public double getDouble(int i) {
+                    return 0;
+                }
+
+                @Override
+                public String getString(int i) {
+                    if (i == 0) {
+                        return "tableRoot_" + finalI;
+                    } else if (i == 1) {
+                        return "addFilePath_" + finalI;
+                    }
+                    return null;
+                }
+
+                @Override
+                public BigDecimal getDecimal(int i) {
+                    return null;
+                }
+
+                @Override
+                public byte[] getBinary(int i) {
+                    return new byte[0];
+                }
+
+                @Override
+                public ArrayValue getArray(int i) {
+                    return null;
+                }
+
+                @Override
+                public MapValue getMap(int i) {
+                    return null;
+                }
+
+                @Override
+                public Row getStruct(int index) {
+                    if (index == ADD_FILE_ORDINAL) {
+                        return createAddFileEntry(finalI);
+                    }
+                    return null;
+                }
+            };
+
+            rows.add(row);
+        }
+
+        return rows;
+    }
+
+    private StructType createMockSchema() {
+        List<StructField> fields = new ArrayList<>();
+
+        fields.add(new StructField("field1", StringType.STRING, true));
+        fields.add(new StructField("field2", IntegerType.INTEGER, true));
+        fields.add(new StructField("field3", IntegerType.INTEGER, true));
+
+        return new StructType(fields);
+    }
+
+    private Row createAddFileEntry(int i) {
+        List<StructField> addFileFields = new ArrayList<>();
+
+        addFileFields.add(new StructField("addFilePath", StringType.STRING, true));
+        addFileFields.add(new StructField("size", IntegerType.INTEGER, true));
+
+        StructType addFileSchema = new StructType(addFileFields);
+
+        Row addFileRow = new Row() {
+            @Override
+            public StructType getSchema() {
+                return addFileSchema;
+            }
+
+            @Override
+            public boolean isNullAt(int index) {
+                return false;
+            }
+
+            @Override
+            public boolean getBoolean(int i) {
+                return false;
+            }
+
+            @Override
+            public byte getByte(int i) {
+                return 0;
+            }
+
+            @Override
+            public short getShort(int i) {
+                return 0;
+            }
+
+            @Override
+            public int getInt(int index) {
+                if (index == 1) {
+                    return i * 100;
+                }
+                return 0;
+            }
+
+            @Override
+            public long getLong(int i) {
+                return 0;
+            }
+
+            @Override
+            public float getFloat(int i) {
+                return 0;
+            }
+
+            @Override
+            public double getDouble(int i) {
+                return 0;
+            }
+
+            @Override
+            public BigDecimal getDecimal(int i) {
+                return null;
+            }
+
+            @Override
+            public byte[] getBinary(int i) {
+                return new byte[0];
+            }
+
+            @Override
+            public Row getStruct(int index) {
+                return null;
+            }
+
+            @Override
+            public ArrayValue getArray(int index) {
+                return null;
+            }
+
+            @Override
+            public MapValue getMap(int index) {
+                return null;
+            }
+
+            @Override
+            public String getString(int index) {
+                if (index == 0) {
+                    return "addFilePath_" + i;
+                }
+                return null;
+            }
+        };
+
+        return addFileRow;
+    }
+}