[NO ISSUE][RUN] Incorrect accounting of tuple size during build phase of join

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

Change-Id: Idd732642424e38892c4f48c5ab77cdb3c747be22
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17367
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 8051305..3645855 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -75,6 +75,15 @@
             throws HyracksDataException;
 
     /**
+     * Returns the number of frames needed to accommodate the tuple.
+     *
+     * @param tupleSize tuple size
+     * @param fieldCount field count. 0 if the tuple size already accounts for fields offsets size.
+     * @return the number of frames needed to accommodate the tuple.
+     */
+    int framesNeeded(int tupleSize, int fieldCount);
+
+    /**
      * Cancels the effect of last insertTuple() operation. i.e. undoes the last insertTuple() operation.
      */
     void cancelInsertTuple(int partition) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index 12985c0..613a396 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -98,4 +98,14 @@
             }
         };
     }
+
+    public String partitionsStatus() {
+        StringBuilder sb = new StringBuilder();
+        int numPartitions = bufferManager.getNumPartitions();
+        for (int p = 0; p < numPartitions; p++) {
+            sb.append("p:").append(p).append(",#t:").append(bufferManager.getNumTuples(p)).append(",s:")
+                    .append(spilledStatus.get(p)).append(",s:").append(bufferManager.getPhysicalSize(p)).append('\n');
+        }
+        return sb.toString();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index d3d06cb..722512a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -132,15 +132,15 @@
     @Override
     public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
             TuplePointer pointer) throws HyracksDataException {
-        int actualSize = calculateActualSize(fieldEndOffsets, size);
-        int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
+        int fieldCount = fieldEndOffsets == null ? 0 : fieldEndOffsets.length;
+        int fid = getLastBufferOrCreateNewIfNotExist(partition, size, fieldCount);
         if (fid < 0) {
             return false;
         }
         partitionArray[partition].getFrame(fid, tempInfo);
         int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
         if (tid < 0) {
-            fid = createNewBuffer(partition, actualSize);
+            fid = createNewBuffer(partition, size, fieldCount);
             if (fid < 0) {
                 return false;
             }
@@ -170,6 +170,12 @@
         numTuples[partition]--;
     }
 
+    @Override
+    public int framesNeeded(int tupleSize, int fieldCount) {
+        int minFrameSize = framePool.getMinFrameSize();
+        return FrameHelper.calcAlignedFrameSizeToStore(fieldCount, tupleSize, minFrameSize) / minFrameSize;
+    }
+
     public static int calculateActualSize(int[] fieldEndOffsets, int size) {
         if (fieldEndOffsets != null) {
             return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
@@ -189,8 +195,8 @@
         return externalFrameId / getNumPartitions();
     }
 
-    private int createNewBuffer(int partition, int size) throws HyracksDataException {
-        ByteBuffer newBuffer = requestNewBufferFromPool(size, partition);
+    private int createNewBuffer(int partition, int tupleSize, int fieldCount) throws HyracksDataException {
+        ByteBuffer newBuffer = requestNewBufferFromPool(tupleSize, partition, fieldCount);
         if (newBuffer == null) {
             return -1;
         }
@@ -199,9 +205,11 @@
         return partitionArray[partition].insertFrame(newBuffer);
     }
 
-    private ByteBuffer requestNewBufferFromPool(int recordSize, int partition) throws HyracksDataException {
-        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize());
-        if ((double) frameSize / (double) framePool.getMinFrameSize() + getPhysicalSize(partition) > constrain
+    private ByteBuffer requestNewBufferFromPool(int recordSize, int partition, int fieldCount)
+            throws HyracksDataException {
+        int minFrameSize = framePool.getMinFrameSize();
+        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(fieldCount, recordSize, minFrameSize);
+        if ((double) frameSize / (double) minFrameSize + getPhysicalSize(partition) / (double) minFrameSize > constrain
                 .frameLimit(partition)) {
             return null;
         }
@@ -238,10 +246,11 @@
         }
     }
 
-    private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
+    private int getLastBufferOrCreateNewIfNotExist(int partition, int tupleSize, int fieldCount)
+            throws HyracksDataException {
         if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
             partitionArray[partition] = new FrameBufferManager();
-            return createNewBuffer(partition, actualSize);
+            return createNewBuffer(partition, tupleSize, fieldCount);
         }
         return getLastBuffer(partition);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 7a9bb25..5f80165 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -49,6 +49,8 @@
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * This class mainly applies one level of HHJ on a pair of
@@ -56,6 +58,7 @@
  */
 public class OptimizedHybridHashJoin {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     // Used for special probe BigObject which can not be held into the Join memory
     private FrameTupleAppender bigFrameAppender;
 
@@ -152,19 +155,23 @@
     private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
         // insertTuple prevents the tuple to acquire a number of frames that is > the frame limit
         while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-            int recordSize = VPartitionTupleBufferManager.calculateActualSize(null, accessorBuild.getTupleLength(tid));
-            double numFrames = (double) recordSize / (double) jobletCtx.getInitialFrameSize();
+            int numFrames = bufferManager.framesNeeded(accessorBuild.getTupleLength(tid), 0);
             int victimPartition;
-            if (numFrames > bufferManager.getConstrain().frameLimit(pid)
-                    || (victimPartition = spillPolicy.selectVictimPartition(pid)) < 0) {
+            int partitionFrameLimit = bufferManager.getConstrain().frameLimit(pid);
+            if (numFrames > partitionFrameLimit || (victimPartition = spillPolicy.selectVictimPartition(pid)) < 0) {
                 // insert request can never be satisfied
-                if (numFrames > memSizeInFrames || recordSize < jobletCtx.getInitialFrameSize()) {
-                    // the tuple is greater than the memory budget or although the record is small we could not find
-                    // a frame for it (possibly due to a bug)
+                if (numFrames > memSizeInFrames) {
+                    // the tuple is greater than the memory budget
+                    logTupleInsertionFailure(tid, pid, numFrames, partitionFrameLimit);
                     throw HyracksDataException.create(ErrorCode.INSUFFICIENT_MEMORY);
                 }
+                if (numFrames <= 1) {
+                    // this shouldn't happen. whether the partition is spilled or not, it should be able to get 1 frame
+                    logTupleInsertionFailure(tid, pid, numFrames, partitionFrameLimit);
+                    throw new IllegalStateException("can't insert tuple in join memory");
+                }
                 // Record is large but insertion failed either 1) we could not satisfy the request because of the
-                // frame limit or 2) we could not find a victim anymore (exhaused all victims) and the partition is
+                // frame limit or 2) we could not find a victim anymore (exhausted all victims) and the partition is
                 // memory-resident with no frame.
                 flushBigObjectToDisk(pid, accessorBuild, tid, buildRFWriters, buildRelName);
                 spilledStatus.set(pid);
@@ -613,4 +620,14 @@
         }
         this.isReversed = reversed;
     }
+
+    private void logTupleInsertionFailure(int tid, int pid, int numFrames, int partitionFrameLimit) {
+        int recordSize = VPartitionTupleBufferManager.calculateActualSize(null, accessorBuild.getTupleLength(tid));
+        String details = String.format(
+                "partition %s, tuple size %s, needed # frames %s, partition frame limit %s, join "
+                        + "memory in frames %s, initial frame size %s",
+                pid, recordSize, numFrames, partitionFrameLimit, memSizeInFrames, jobletCtx.getInitialFrameSize());
+        LOGGER.debug("can't insert tuple in join memory. {}", details);
+        LOGGER.debug("partitions status:\n{}", spillPolicy.partitionsStatus());
+    }
 }