ASTERIXDB-1628: Fixed an issue in External Hash Group by

 - The number of partitions in External Hash Group By is now
   properly calculated by considering a corner case.

Change-Id: I8901d2b64659fb0d2b97d73f45a9fe113232e860
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1144
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Taewoo Kim <wangsaeu@yahoo.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f08d27d..34fdc48 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -57,8 +57,8 @@
     }
 
     @Override
-    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize, long dataBytesSize,
-            final int[] keyFields, final IBinaryComparator[] comparators,
+    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize,
+            long inputDataBytesSize, final int[] keyFields, final IBinaryComparator[] comparators,
             final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
             RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
             final int seed) throws HyracksDataException {
@@ -86,11 +86,12 @@
         final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         //TODO(jf) research on the optimized partition size
-        final int numPartitions = getNumOfPartitions((int) (dataBytesSize / ctx.getInitialFrameSize()),
+        final int numPartitions = getNumOfPartitions((int) (inputDataBytesSize / ctx.getInitialFrameSize()),
                 framesLimit - 1);
         final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
         if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("create hashtable, table size:" + tableSize + " file size:" + dataBytesSize + "  partitions:"
+            LOGGER.fine(
+                    "create hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize + "  partitions:"
                     + numPartitions);
         }
 
@@ -238,17 +239,23 @@
         };
     }
 
-    private int getNumOfPartitions(int nubmerOfFramesForData, int frameLimit) {
-        if (frameLimit > nubmerOfFramesForData) {
-            return 1; // all in memory, we will create a big partition
+    /**
+     * Calculate the number of partitions for Data table. The formula is from Shapiro's paper -
+     * http://cs.stanford.edu/people/chrismre/cs345/rl/shapiro.pdf. Check the page 249 for more details.
+     * If the required number of frames is greater than the number of available frames, we make sure that
+     * at least two partitions will be created. Also, if the number of partitions is greater than the memory budget,
+     * we may not allocate at least one frame for each partition in memory. So, we also deal with those cases
+     * at the final part of the method.
+     */
+    private int getNumOfPartitions(int nubmerOfInputFrames, int frameLimit) {
+        if (frameLimit >= nubmerOfInputFrames * FUDGE_FACTOR) {
+            return 1; // all in memory, we will create a big partition.
         }
         int numberOfPartitions = (int) (Math
-                .ceil((nubmerOfFramesForData * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
-        if (numberOfPartitions <= 0) {
-            numberOfPartitions = 1; //becomes in-memory hash
-        }
+                .ceil((nubmerOfInputFrames * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
+        numberOfPartitions = Math.max(2, numberOfPartitions);
         if (numberOfPartitions > frameLimit) {
-            numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfFramesForData * FUDGE_FACTOR));
+            numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfInputFrames * FUDGE_FACTOR));
             return Math.max(2, Math.min(numberOfPartitions, frameLimit));
         }
         return numberOfPartitions;