[ASTERIXDB-2623][FUN] TPC-DS revert partitioning behavior back to default
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To avoid unexpected behavior, this change reverts the partitioning
logic back to the default behavior from the library.
Change-Id: I0de179e33cd74dea333d163a108b1b7606b45643
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3516
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
index 1122a29..3d08c01 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSDataGeneratorReader.java
@@ -36,12 +36,11 @@
import com.teradata.tpcds.Table;
/**
- * Each partition will be running a TPCDS data generator reader instance. Depending on the number of partitions, the
- * data generator will parallelize its work based on the number of partitions. The reader is passed the parallelism
- * level based on the number of partition instances.
+ * Each partition will be running a TPCDS data generator reader instance. The data generator will parallelize its work
+ * based on the number of partitions available. The reader is passed the parallelism level based on the number of
+ * partition instances.
*
- * The function automatically handles generating the data for a single specified table or for all the tables. Also,
- * the parallelism will take place regardless of the selected data size to be generated.
+ * The function automatically handles generating the data for a single specified table or for all the tables.
*/
public class TPCDSDataGeneratorReader extends FunctionReader {
@@ -86,7 +85,7 @@
// Iterators for the tables to generate the data for
for (Table table : selectedTables) {
- Results result = calculateParallelism(table, session);
+ Results result = Results.constructResults(table, session);
tableIterators.add(result.iterator());
}
}
@@ -134,7 +133,7 @@
}
/**
- * Builds the string record from the generated values by the data generator. The column name for each value is
+ * Builds the string record from the generated values by the data generator. The field name for each value is
* extracted from the table from which the data is being generated.
*
* @param values List containing all the generated column values
@@ -223,62 +222,6 @@
}
/**
- * As the TPC-DS library has constraints on activating the parallelism (table must be generating 1,000,000 records
- * based on a scaling factor), we're gonna override that behavior and calculate the parallelism manually. This
- * will ensure the activation of the parallelism regardless of the data size being generated.
- *
- * @param table table to generate the data for
- * @param session session containing the parallelism and scaling information
- *
- * @return Results that holds a lazy-iterator to generate the data based on the calculated parameters.
- */
- private Results calculateParallelism(Table table, Session session) {
-
- // Total and parallelism level
- long total = session.getScaling().getRowCount(table);
- int parallelism = session.getParallelism();
-
- // Row set size to be generated for each partition
- long rowSetSize = total / parallelism;
-
- // Special case: WEB_SITE table sometimes relies on the previous records, this could be a problem if the
- // previous record is on a different thread. Since it's a small table, we'll generate it all on the first
- // thread and let the other threads generate nothing
- if (table.equals(Table.WEB_SITE)) {
- if (session.getChunkNumber() - 1 == 0) {
- return new Results(table, 1, total, session);
- }
- // Don't generate anything on other partition (start > end)
- else {
- return new Results(table, 2, 1, session);
- }
- }
-
- // Special case: For very small tables, if the rowSetSize ends up being 1, this will cause an issue in the
- // parallelism, so we'll just let the first thread do all the work
- if (rowSetSize == 1) {
- if (session.getChunkNumber() - 1 == 0) {
- return new Results(table, 1, total, session);
- }
- // Don't generate anything on other partition (start > end)
- else {
- return new Results(table, 2, 1, session);
- }
- }
-
- // Start and end calculated for each partition
- long startRow = (session.getChunkNumber() - 1) * rowSetSize + 1;
- long rowCount = startRow + rowSetSize - 1;
-
- // Any extra rows (not evenly divided) will be done by the last partition
- if (session.getChunkNumber() == parallelism) {
- rowCount += total % parallelism;
- }
-
- return new Results(table, startRow, rowCount, session);
- }
-
- /**
* Gets the function identifier
*
* @return function identifier