[ASTERIXDB-3547][EXT]: Use IDataPartitioningProvider for Location Constraints Configuration in Delta
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-64800
Change-Id: I9ba2ce7afc54ddd08f5e522627c56e44f51cdbbc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19293
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c50b391..7e7b6f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -440,6 +440,11 @@
loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/delta_all_type/_delta_log", JSON_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/delta_all_type", PARQUET_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine", PARQUET_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/");
+
}
private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 1236636..67d460c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -55,6 +55,10 @@
"target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "modified_delta_table";
public static final String DELTA_MULTI_FILE_TABLE =
"target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "multiple_file_delta_table";
+ public static final String DELTA_FILE_SIZE_ONE =
+ "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_one";
+ public static final String DELTA_FILE_SIZE_NINE =
+ "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine";
public static void prepareDeltaTableContainer(Configuration conf) {
File basePath = new File(".");
@@ -62,6 +66,8 @@
prepareMultipleFilesTable(conf);
prepareModifiedTable(conf);
prepareEmptyTable(conf);
+ prepareFileSizeOne(conf);
+ prepareFileSizeNine(conf);
}
public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) {
@@ -221,4 +227,129 @@
throw new RuntimeException(e);
}
}
+
+ public static void prepareFileSizeOne(Configuration conf) {
+ Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord();
+ try {
+ Path path = new Path(DELTA_FILE_SIZE_ONE, "firstFile.parquet");
+ ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+
+ List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema));
+
+ fileFirstSnapshotRecords.get(0).put("id", 0);
+ fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+
+ for (GenericData.Record record : fileFirstSnapshotRecords) {
+ writer.write(record);
+ }
+
+ long size = writer.getDataSize();
+ writer.close();
+
+ List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size,
+ System.currentTimeMillis(), true, null, null));
+ DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_ONE);
+ OptimisticTransaction txn = log.startTransaction();
+ Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+ .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+ .add(new StructField("name", new StringType(), true)))
+ .build();
+ txn.updateMetadata(metaData);
+ txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void prepareFileSizeNine(Configuration conf) {
+ Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name").endRecord();
+ try {
+ Path path = new Path(DELTA_FILE_SIZE_NINE, "firstFile.parquet");
+ ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+
+ List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileSecondSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileThirdSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileFourthSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileFifthSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileSixthSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileSeventhSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileEightSnapshotRecords = List.of(new GenericData.Record(schema));
+ List<GenericData.Record> fileNineSnapshotRecords = List.of(new GenericData.Record(schema));
+
+ List<List<GenericData.Record>> allSnapshotRecords =
+ List.of(fileFirstSnapshotRecords, fileSecondSnapshotRecords, fileThirdSnapshotRecords,
+ fileFourthSnapshotRecords, fileFifthSnapshotRecords, fileSixthSnapshotRecords,
+ fileSeventhSnapshotRecords, fileEightSnapshotRecords, fileNineSnapshotRecords);
+
+ fileFirstSnapshotRecords.get(0).put("id", 0);
+ fileFirstSnapshotRecords.get(0).put("name", "Cooper");
+
+ fileSecondSnapshotRecords.get(0).put("id", 1);
+ fileSecondSnapshotRecords.get(0).put("name", "Adam");
+
+ fileThirdSnapshotRecords.get(0).put("id", 2);
+ fileThirdSnapshotRecords.get(0).put("name", "Third");
+
+ fileFourthSnapshotRecords.get(0).put("id", 3);
+ fileFourthSnapshotRecords.get(0).put("name", "Fourth");
+
+ fileFifthSnapshotRecords.get(0).put("id", 4);
+ fileFifthSnapshotRecords.get(0).put("name", "Five");
+
+ fileSixthSnapshotRecords.get(0).put("id", 5);
+ fileSixthSnapshotRecords.get(0).put("name", "Six");
+
+ fileSeventhSnapshotRecords.get(0).put("id", 6);
+ fileSeventhSnapshotRecords.get(0).put("name", "Seven");
+
+ fileEightSnapshotRecords.get(0).put("id", 7);
+ fileEightSnapshotRecords.get(0).put("name", "Eight");
+
+ fileNineSnapshotRecords.get(0).put("id", 8);
+ fileNineSnapshotRecords.get(0).put("name", "Nine");
+
+ for (GenericData.Record record : fileFirstSnapshotRecords) {
+ writer.write(record);
+ }
+
+ long size = writer.getDataSize();
+ writer.close();
+
+ List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size,
+ System.currentTimeMillis(), true, null, null));
+ DeltaLog log = DeltaLog.forTable(conf, DELTA_FILE_SIZE_NINE);
+ OptimisticTransaction txn = log.startTransaction();
+ Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+ .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+ .add(new StructField("name", new StringType(), true)))
+ .build();
+ txn.updateMetadata(metaData);
+ txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+ for (int i = 2; i <= 9; i++) {
+ Path path2 = new Path(DELTA_FILE_SIZE_NINE, "File" + i + ".parquet");
+ ParquetWriter<GenericData.Record> writer2 =
+ AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build();
+
+ for (GenericData.Record record : allSnapshotRecords.get(i - 1)) {
+ writer2.write(record);
+ }
+
+ long size2 = writer2.getDataSize();
+ writer2.close();
+
+ List<Action> actions2 = List.of(new AddFile("File" + i + ".parquet", new HashMap<>(), size2,
+ System.currentTimeMillis(), true, null, null));
+
+ OptimisticTransaction txn2 = log.startTransaction();
+ txn2.commit(actions2, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
new file mode 100644
index 0000000..c1a74c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_file_size_nine"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
new file mode 100644
index 0000000..db2abf5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
new file mode 100644
index 0000000..1284e93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/delta_file_size_one"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
new file mode 100644
index 0000000..84e7914
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-file-one/deltalake-file-one.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
new file mode 100644
index 0000000..500f6a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-nine/deltalake-file-nine.01.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Cooper" }
+{ "id": 1, "name": "Adam" }
+{ "id": 2, "name": "Third" }
+{ "id": 3, "name": "Fourth" }
+{ "id": 4, "name": "Five" }
+{ "id": 5, "name": "Six" }
+{ "id": 6, "name": "Seven" }
+{ "id": 7, "name": "Eight" }
+{ "id": 8, "name": "Nine" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
new file mode 100644
index 0000000..006681c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-file-one/deltalake-file-one.01.adm
@@ -0,0 +1 @@
+{ "id": 0, "name": "Cooper" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ff1b325..c31cb5e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -556,6 +556,18 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-file-one">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-file-one</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-file-nine">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-file-nine</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-map">
<placeholder name="adapter" value="S3" />
<placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index dc4c310..790db8c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -30,7 +30,6 @@
import java.util.PriorityQueue;
import java.util.Set;
-import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -44,7 +43,6 @@
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -77,6 +75,10 @@
protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
protected ConfFactory confFactory;
+ public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() {
+ return partitionWorkLoadsBasedOnSize;
+ }
+
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
return locationConstraints;
@@ -134,9 +136,9 @@
scanFiles.add(row);
}
}
- locationConstraints = configureLocationConstraints(appCtx, scanFiles);
+ locationConstraints = getPartitions(appCtx);
configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
- distributeFiles(scanFiles);
+ distributeFiles(scanFiles, getPartitionConstraint().getLocations().length);
issueWarnings(warnings, warningCollector);
}
@@ -151,26 +153,11 @@
warnings.clear();
}
- private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
- List<Row> scanFiles) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
-
- String[] locations = csm.getClusterLocations().getLocations();
- if (scanFiles.size() == 0) {
- return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
- } else if (locations.length > scanFiles.size()) {
- LOGGER.debug(
- "configured partitions ({}) exceeds total partition count ({}); limiting configured partitions to total partition count",
- locations.length, scanFiles.size());
- final String[] locationCopy = locations.clone();
- ArrayUtils.shuffle(locationCopy);
- locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
- }
- return new AlgebricksAbsolutePartitionConstraint(locations);
+ public AlgebricksAbsolutePartitionConstraint getPartitions(ICcApplicationContext appCtx) {
+ return appCtx.getDataPartitioningProvider().getClusterLocations();
}
- private void distributeFiles(List<Row> scanFiles) {
- final int partitionsCount = getPartitionConstraint().getLocations().length;
+ public void distributeFiles(List<Row> scanFiles, int partitionsCount) {
PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
new file mode 100644
index 0000000..1bc8eb8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.awss3;
+
+import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_ORDINAL;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+public class DeltaTopicPartitionDistributionTest {
+
+ @Test
+ public void distributeFilesMoreFilesThanPartitions() {
+ int rowCount = 25;
+ int numberOfPartition = 13;
+ List<Row> scanFiles = createMockRows(rowCount);
+ DeltaReaderFactory d = new DeltaReaderFactory() {
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException {
+
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return null;
+ }
+ };
+ d.distributeFiles(scanFiles, numberOfPartition);
+ Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size());
+ verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize());
+ }
+
+ @Test
+ public void distributeFilesLessFilesThanPartitions() {
+ int rowCount = 15;
+ int numberOfPartition = 23;
+ List<Row> scanFiles = createMockRows(rowCount);
+ DeltaReaderFactory d = new DeltaReaderFactory() {
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException {
+
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return null;
+ }
+ };
+ d.distributeFiles(scanFiles, numberOfPartition);
+ Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size());
+ verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize());
+ }
+
+ @Test
+ public void distributeFilesEqualFilesAndPartitions() {
+ int rowCount = 9;
+ int numberOfPartition = 9;
+ List<Row> scanFiles = createMockRows(rowCount);
+ DeltaReaderFactory d = new DeltaReaderFactory() {
+ @Override
+ protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException {
+
+ }
+
+ @Override
+ protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return null;
+ }
+ };
+ d.distributeFiles(scanFiles, numberOfPartition);
+ Assert.assertEquals(numberOfPartition, d.getPartitionWorkLoadsBasedOnSize().size());
+ verifyFileDistribution(scanFiles.size(), d.getPartitionWorkLoadsBasedOnSize());
+ }
+
+ private void verifyFileDistribution(int numberOfFiles,
+ List<DeltaReaderFactory.PartitionWorkLoadBasedOnSize> workloads) {
+ int totalDistributedFiles = 0;
+
+ for (DeltaReaderFactory.PartitionWorkLoadBasedOnSize workload : workloads) {
+ totalDistributedFiles += workload.getScanFiles().size();
+ Assert.assertTrue(workload.getTotalSize() >= 0);
+ }
+ Assert.assertEquals(numberOfFiles, totalDistributedFiles);
+ }
+
+ private List<Row> createMockRows(int count) {
+ List<Row> rows = new ArrayList<>();
+ StructType sch = createMockSchema();
+
+ for (int i = 1; i <= count; i++) {
+ int finalI = i;
+ Row row = new Row() {
+
+ @Override
+ public StructType getSchema() {
+ return sch;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return false;
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return false;
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return 0;
+ }
+
+ @Override
+ public short getShort(int i) {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int i) {
+ if (i == 1) {
+ return finalI;
+ } else if (i == 2) {
+ return finalI * 10;
+ }
+ return 0;
+ }
+
+ @Override
+ public long getLong(int i) {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return 0;
+ }
+
+ @Override
+ public String getString(int i) {
+ if (i == 0) {
+ return "tableRoot_" + finalI;
+ } else if (i == 1) {
+ return "addFilePath_" + finalI;
+ }
+ return null;
+ }
+
+ @Override
+ public BigDecimal getDecimal(int i) {
+ return null;
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return new byte[0];
+ }
+
+ @Override
+ public ArrayValue getArray(int i) {
+ return null;
+ }
+
+ @Override
+ public MapValue getMap(int i) {
+ return null;
+ }
+
+ @Override
+ public Row getStruct(int index) {
+ if (index == ADD_FILE_ORDINAL) {
+ return createAddFileEntry(finalI);
+ }
+ return null;
+ }
+ };
+
+ rows.add(row);
+ }
+
+ return rows;
+ }
+
+ private StructType createMockSchema() {
+ List<StructField> fields = new ArrayList<>();
+
+ fields.add(new StructField("field1", StringType.STRING, true));
+ fields.add(new StructField("field2", IntegerType.INTEGER, true));
+ fields.add(new StructField("field3", IntegerType.INTEGER, true));
+
+ return new StructType(fields);
+ }
+
+ private Row createAddFileEntry(int i) {
+ List<StructField> addFileFields = new ArrayList<>();
+
+ addFileFields.add(new StructField("addFilePath", StringType.STRING, true));
+ addFileFields.add(new StructField("size", IntegerType.INTEGER, true));
+
+ StructType addFileSchema = new StructType(addFileFields);
+
+ Row addFileRow = new Row() {
+ @Override
+ public StructType getSchema() {
+ return addFileSchema;
+ }
+
+ @Override
+ public boolean isNullAt(int index) {
+ return false;
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return false;
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return 0;
+ }
+
+ @Override
+ public short getShort(int i) {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int index) {
+ if (index == 1) {
+ return i * 100;
+ }
+ return 0;
+ }
+
+ @Override
+ public long getLong(int i) {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return 0;
+ }
+
+ @Override
+ public BigDecimal getDecimal(int i) {
+ return null;
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return new byte[0];
+ }
+
+ @Override
+ public Row getStruct(int index) {
+ return null;
+ }
+
+ @Override
+ public ArrayValue getArray(int index) {
+ return null;
+ }
+
+ @Override
+ public MapValue getMap(int index) {
+ return null;
+ }
+
+ @Override
+ public String getString(int index) {
+ if (index == 0) {
+ return "addFilePath_" + i;
+ }
+ return null;
+ }
+ };
+
+ return addFileRow;
+ }
+}