ASTERIXDB-1892: Sets a proper hash table cardinality during hash-group by
- Set a proper hash table cardinality during the merge phase
of the external hash group-by operator.
- Currently, the number of tuples in a spilled partition is
used as the hash table cardinality. And this can cause an issue
since compiler.groupmemory size is not considered.
- So, like the initial group-by build phase, the hash table
cardinality will be set properly based on the memory budget for
the group-by operator.
Change-Id: I651139b2b559ad4d2f6137a5c844814606516a90
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1702
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 6fdaec5..3c2912e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -81,16 +81,5 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
- <dependency>
- <groupId>com.e-movimento.tinytools</groupId>
- <artifactId>privilegedaccessor</artifactId>
- <version>1.2.2</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 8555ade..9e7daf0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -65,7 +65,6 @@
import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
@@ -259,8 +258,8 @@
// Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
int memoryBudgetInBytes = context.getFrameSize() * frameLimit;
int groupByColumnsCount = gby.getGroupByList().size() + numFds;
- int hashTableSize = calculateGroupByTableCardinality(memoryBudgetInBytes, groupByColumnsCount,
- context.getFrameSize());
+ int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
+ groupByColumnsCount, context.getFrameSize());
ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
@@ -282,51 +281,4 @@
return true;
}
- /**
- * Based on a rough estimation of a tuple (each field size: 4 bytes) size and the number of possible hash values
- * for the given number of group-by columns, calculates the number of hash entries for the hash table in Group-by.
- * The formula is min(# of possible hash values, # of possible tuples in the data table).
- * This method assumes that the group-by table consists of hash table that stores hash value of tuple pointer
- * and data table actually stores the aggregated tuple.
- * For more details, refer to this JIRA issue: https://issues.apache.org/jira/browse/ASTERIXDB-1556
- *
- * @param memoryBudgetByteSize
- * @param numberOfGroupByColumns
- * @return group-by table size (the cardinality of group-by table)
- */
- public static int calculateGroupByTableCardinality(long memoryBudgetByteSize, int numberOfGroupByColumns,
- int frameSize) {
- // Estimates a minimum tuple size with n fields:
- // (4:tuple offset in a frame, 4n:each field offset in a tuple, 4n:each field size 4 bytes)
- int tupleByteSize = 4 + 8 * numberOfGroupByColumns;
-
- // Maximum number of tuples
- long maxNumberOfTuplesInDataTable = memoryBudgetByteSize / tupleByteSize;
-
- // To calculate possible hash values, this counts the number of bits.
- // We assume that each field consists of 4 bytes.
- // Also, too high range that is greater than Long.MAXVALUE (64 bits) is not necessary for our calculation.
- // And, this should not generate negative numbers when shifting the number.
- int numberOfBits = Math.min(61, numberOfGroupByColumns * 4 * 8);
-
- // Possible number of unique hash entries
- long possibleNumberOfHashEntries = 2L << numberOfBits;
-
- // Between # of entries in Data table and # of possible hash values, we choose the smaller one.
- long groupByTableCardinality = Math.min(possibleNumberOfHashEntries, maxNumberOfTuplesInDataTable);
- long groupByTableByteSize = SerializableHashTable.getExpectedTableByteSize(groupByTableCardinality, frameSize);
-
- // Gets the ratio of hash-table size in the total size (hash + data table).
- double hashTableRatio = (double) groupByTableByteSize / (groupByTableByteSize + memoryBudgetByteSize);
-
- // Gets the table size based on the ratio that we have calculated.
- long finalGroupByTableByteSize = (long) (hashTableRatio * memoryBudgetByteSize);
-
- long finalGroupByTableCardinality = finalGroupByTableByteSize
- / SerializableHashTable.getExpectedByteSizePerHashValue();
-
- // The maximum cardinality of a hash table: Integer.MAX_VALUE
- return finalGroupByTableCardinality > Integer.MAX_VALUE ? Integer.MAX_VALUE
- : (int) finalGroupByTableCardinality;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index 72a1bb6..0285069 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -76,6 +76,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>com.e-movimento.tinytools</groupId>
+ <artifactId>privilegedaccessor</artifactId>
+ <version>1.2.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 4e0724c..2d8433d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.ISpillableTableFactory;
+import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
/**
*
@@ -151,4 +152,51 @@
}
+ /**
+ * Based on a rough estimation of a tuple (each field size: 4 bytes) size and the number of possible hash values
+ * for the given number of group-by columns, calculates the number of hash entries for the hash table in Group-by.
+ * The formula is min(# of possible hash values, # of possible tuples in the data table).
+ * This method assumes that the group-by table consists of hash table that stores hash value of tuple pointer
+ * and data table actually stores the aggregated tuple.
+ * For more details, refer to this JIRA issue: https://issues.apache.org/jira/browse/ASTERIXDB-1556
+ *
+ * @param memoryBudgetByteSize
+ * @param numberOfGroupByColumns
+ * @return group-by table size (the cardinality of group-by table)
+ */
+ public static int calculateGroupByTableCardinality(long memoryBudgetByteSize, int numberOfGroupByColumns,
+ int frameSize) {
+ // Estimates a minimum tuple size with n fields:
+ // (4:tuple offset in a frame, 4n:each field offset in a tuple, 4n:each field size 4 bytes)
+ int tupleByteSize = 4 + 8 * numberOfGroupByColumns;
+
+ // Maximum number of tuples
+ long maxNumberOfTuplesInDataTable = memoryBudgetByteSize / tupleByteSize;
+
+ // To calculate possible hash values, this counts the number of bits.
+ // We assume that each field consists of 4 bytes.
+ // Also, too high range that is greater than Long.MAXVALUE (64 bits) is not necessary for our calculation.
+ // And, this should not generate negative numbers when shifting the number.
+ int numberOfBits = Math.min(61, numberOfGroupByColumns * 4 * 8);
+
+ // Possible number of unique hash entries
+ long possibleNumberOfHashEntries = 2L << numberOfBits;
+
+ // Between # of entries in Data table and # of possible hash values, we choose the smaller one.
+ long groupByTableCardinality = Math.min(possibleNumberOfHashEntries, maxNumberOfTuplesInDataTable);
+ long groupByTableByteSize = SerializableHashTable.getExpectedTableByteSize(groupByTableCardinality, frameSize);
+
+ // Gets the ratio of hash-table size in the total size (hash + data table).
+ double hashTableRatio = (double) groupByTableByteSize / (groupByTableByteSize + memoryBudgetByteSize);
+
+ // Gets the table size based on the ratio that we have calculated.
+ long finalGroupByTableByteSize = (long) (hashTableRatio * memoryBudgetByteSize);
+
+ long finalGroupByTableCardinality =
+ finalGroupByTableByteSize / SerializableHashTable.getExpectedByteSizePerHashValue();
+
+ // The maximum cardinality of a hash table: Integer.MAX_VALUE
+ return finalGroupByTableCardinality > Integer.MAX_VALUE ? Integer.MAX_VALUE
+ : (int) finalGroupByTableCardinality;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index b17215f..9a3668e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -120,7 +120,13 @@
for (int i = 0; i < runs.length; i++) {
if (runs[i] != null) {
- ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, numOfTuples[i],
+ // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
+ int memoryBudgetInBytes = ctx.getInitialFrameSize() * frameLimit;
+ int groupByColumnsCount = mergeGroupFields.length;
+ int hashTableCardinality = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(
+ memoryBudgetInBytes, groupByColumnsCount, ctx.getInitialFrameSize());
+ hashTableCardinality = (int) Math.min(hashTableCardinality, numOfTuples[i]);
+ ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, hashTableCardinality,
runs[i].getFileSize(), mergeGroupFields, groupByComparators, nmkComputer,
mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, frameLimit, level);
RunFileWriter[] runFileWriters = new RunFileWriter[partitionTable.getNumPartitions()];
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
similarity index 82%
rename from hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
rename to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
index a633998..392aab5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
@@ -17,33 +17,24 @@
* under the License.
*/
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+package org.apache.hyracks.dataflow.std.group.external;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobSpecification;
import org.junit.Assert;
import org.junit.Test;
import junit.extensions.PA;
-public class ExternalGroupByPOperatorTest {
+public class ExternalGroupOperatorDescriptorTest {
@Test
public void testCalculateGroupByTableCardinality() throws Exception {
- // Creates a dummy variable and an expression that are needed by the operator. They are not used by this test.
- LogicalVariable v = new LogicalVariable(0);
- MutableObject<ILogicalExpression> e = new MutableObject<ILogicalExpression>(new VariableReferenceExpression(v));
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = new ArrayList<>();
- gbyList.add(new Pair<>(v, e));
- ExternalGroupByPOperator eGByOp = new ExternalGroupByPOperator(gbyList, 0, 0);
+ // Sets a dummy variable.
+ IOperatorDescriptorRegistry spec = new JobSpecification(32768);
+ ExternalGroupOperatorDescriptor eGByOp =
+ new ExternalGroupOperatorDescriptor(spec, 0, 0, null, 4, null, null, null, null, null, null, null);
// Test 1: compiler.groupmemory: 512 bytes, frame size: 256 bytes, with 1 column group-by
long memoryBudgetInBytes = 512;