ASTERIXDB-1628: Fixed an issue in External Hash Group by
- The number of partitions in External Hash Group By is now
properly calculated by considering a corner case.
Change-Id: I8901d2b64659fb0d2b97d73f45a9fe113232e860
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1144
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Taewoo Kim <wangsaeu@yahoo.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f08d27d..34fdc48 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -57,8 +57,8 @@
}
@Override
- public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize, long dataBytesSize,
- final int[] keyFields, final IBinaryComparator[] comparators,
+ public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize,
+ long inputDataBytesSize, final int[] keyFields, final IBinaryComparator[] comparators,
final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
final int seed) throws HyracksDataException {
@@ -86,11 +86,12 @@
final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
//TODO(jf) research on the optimized partition size
- final int numPartitions = getNumOfPartitions((int) (dataBytesSize / ctx.getInitialFrameSize()),
+ final int numPartitions = getNumOfPartitions((int) (inputDataBytesSize / ctx.getInitialFrameSize()),
framesLimit - 1);
final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("create hashtable, table size:" + tableSize + " file size:" + dataBytesSize + " partitions:"
+ LOGGER.fine(
+ "create hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize + " partitions:"
+ numPartitions);
}
@@ -238,17 +239,23 @@
};
}
- private int getNumOfPartitions(int nubmerOfFramesForData, int frameLimit) {
- if (frameLimit > nubmerOfFramesForData) {
- return 1; // all in memory, we will create a big partition
+ /**
+ * Calculate the number of partitions for Data table. The formula is from Shapiro's paper -
+ * http://cs.stanford.edu/people/chrismre/cs345/rl/shapiro.pdf. Check the page 249 for more details.
+ * If the required number of frames is greater than the number of available frames, we make sure that
+ * at least two partitions will be created. Also, if the number of partitions is greater than the memory budget,
+ * we may not allocate at least one frame for each partition in memory. So, we also deal with those cases
+ * at the final part of the method.
+ */
+ private int getNumOfPartitions(int nubmerOfInputFrames, int frameLimit) {
+ if (frameLimit >= nubmerOfInputFrames * FUDGE_FACTOR) {
+ return 1; // all in memory, we will create a big partition.
}
int numberOfPartitions = (int) (Math
- .ceil((nubmerOfFramesForData * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
- if (numberOfPartitions <= 0) {
- numberOfPartitions = 1; //becomes in-memory hash
- }
+ .ceil((nubmerOfInputFrames * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
+ numberOfPartitions = Math.max(2, numberOfPartitions);
if (numberOfPartitions > frameLimit) {
- numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfFramesForData * FUDGE_FACTOR));
+ numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfInputFrames * FUDGE_FACTOR));
return Math.max(2, Math.min(numberOfPartitions, frameLimit));
}
return numberOfPartitions;