[NO ISSUE][RUN] Incorrect accounting of tuple size during build phase of join
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Change-Id: Idd732642424e38892c4f48c5ab77cdb3c747be22
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17367
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 8051305..3645855 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -75,6 +75,15 @@
throws HyracksDataException;
/**
+ * Returns the number of frames needed to accommodate the tuple.
+ *
+ * @param tupleSize tuple size
+ * @param fieldCount field count. 0 if the tuple size already accounts for fields offsets size.
+ * @return the number of frames needed to accommodate the tuple.
+ */
+ int framesNeeded(int tupleSize, int fieldCount);
+
+ /**
* Cancels the effect of last insertTuple() operation. i.e. undoes the last insertTuple() operation.
*/
void cancelInsertTuple(int partition) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index 12985c0..613a396 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -98,4 +98,14 @@
}
};
}
+
+ public String partitionsStatus() {
+ StringBuilder sb = new StringBuilder();
+ int numPartitions = bufferManager.getNumPartitions();
+ for (int p = 0; p < numPartitions; p++) {
+ sb.append("p:").append(p).append(",#t:").append(bufferManager.getNumTuples(p)).append(",s:")
+ .append(spilledStatus.get(p)).append(",s:").append(bufferManager.getPhysicalSize(p)).append('\n');
+ }
+ return sb.toString();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index d3d06cb..722512a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -132,15 +132,15 @@
@Override
public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
TuplePointer pointer) throws HyracksDataException {
- int actualSize = calculateActualSize(fieldEndOffsets, size);
- int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
+ int fieldCount = fieldEndOffsets == null ? 0 : fieldEndOffsets.length;
+ int fid = getLastBufferOrCreateNewIfNotExist(partition, size, fieldCount);
if (fid < 0) {
return false;
}
partitionArray[partition].getFrame(fid, tempInfo);
int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
if (tid < 0) {
- fid = createNewBuffer(partition, actualSize);
+ fid = createNewBuffer(partition, size, fieldCount);
if (fid < 0) {
return false;
}
@@ -170,6 +170,12 @@
numTuples[partition]--;
}
+ @Override
+ public int framesNeeded(int tupleSize, int fieldCount) {
+ int minFrameSize = framePool.getMinFrameSize();
+ return FrameHelper.calcAlignedFrameSizeToStore(fieldCount, tupleSize, minFrameSize) / minFrameSize;
+ }
+
public static int calculateActualSize(int[] fieldEndOffsets, int size) {
if (fieldEndOffsets != null) {
return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
@@ -189,8 +195,8 @@
return externalFrameId / getNumPartitions();
}
- private int createNewBuffer(int partition, int size) throws HyracksDataException {
- ByteBuffer newBuffer = requestNewBufferFromPool(size, partition);
+ private int createNewBuffer(int partition, int tupleSize, int fieldCount) throws HyracksDataException {
+ ByteBuffer newBuffer = requestNewBufferFromPool(tupleSize, partition, fieldCount);
if (newBuffer == null) {
return -1;
}
@@ -199,9 +205,11 @@
return partitionArray[partition].insertFrame(newBuffer);
}
- private ByteBuffer requestNewBufferFromPool(int recordSize, int partition) throws HyracksDataException {
- int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize());
- if ((double) frameSize / (double) framePool.getMinFrameSize() + getPhysicalSize(partition) > constrain
+ private ByteBuffer requestNewBufferFromPool(int recordSize, int partition, int fieldCount)
+ throws HyracksDataException {
+ int minFrameSize = framePool.getMinFrameSize();
+ int frameSize = FrameHelper.calcAlignedFrameSizeToStore(fieldCount, recordSize, minFrameSize);
+ if ((double) frameSize / (double) minFrameSize + getPhysicalSize(partition) / (double) minFrameSize > constrain
.frameLimit(partition)) {
return null;
}
@@ -238,10 +246,11 @@
}
}
- private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
+ private int getLastBufferOrCreateNewIfNotExist(int partition, int tupleSize, int fieldCount)
+ throws HyracksDataException {
if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
partitionArray[partition] = new FrameBufferManager();
- return createNewBuffer(partition, actualSize);
+ return createNewBuffer(partition, tupleSize, fieldCount);
}
return getLastBuffer(partition);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 7a9bb25..5f80165 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -49,6 +49,8 @@
import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* This class mainly applies one level of HHJ on a pair of
@@ -56,6 +58,7 @@
*/
public class OptimizedHybridHashJoin {
+ private static final Logger LOGGER = LogManager.getLogger();
// Used for special probe BigObject which can not be held into the Join memory
private FrameTupleAppender bigFrameAppender;
@@ -152,19 +155,23 @@
private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
// insertTuple prevents the tuple to acquire a number of frames that is > the frame limit
while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- int recordSize = VPartitionTupleBufferManager.calculateActualSize(null, accessorBuild.getTupleLength(tid));
- double numFrames = (double) recordSize / (double) jobletCtx.getInitialFrameSize();
+ int numFrames = bufferManager.framesNeeded(accessorBuild.getTupleLength(tid), 0);
int victimPartition;
- if (numFrames > bufferManager.getConstrain().frameLimit(pid)
- || (victimPartition = spillPolicy.selectVictimPartition(pid)) < 0) {
+ int partitionFrameLimit = bufferManager.getConstrain().frameLimit(pid);
+ if (numFrames > partitionFrameLimit || (victimPartition = spillPolicy.selectVictimPartition(pid)) < 0) {
// insert request can never be satisfied
- if (numFrames > memSizeInFrames || recordSize < jobletCtx.getInitialFrameSize()) {
- // the tuple is greater than the memory budget or although the record is small we could not find
- // a frame for it (possibly due to a bug)
+ if (numFrames > memSizeInFrames) {
+ // the tuple is greater than the memory budget
+ logTupleInsertionFailure(tid, pid, numFrames, partitionFrameLimit);
throw HyracksDataException.create(ErrorCode.INSUFFICIENT_MEMORY);
}
+ if (numFrames <= 1) {
+ // this shouldn't happen. whether the partition is spilled or not, it should be able to get 1 frame
+ logTupleInsertionFailure(tid, pid, numFrames, partitionFrameLimit);
+ throw new IllegalStateException("can't insert tuple in join memory");
+ }
// Record is large but insertion failed either 1) we could not satisfy the request because of the
- // frame limit or 2) we could not find a victim anymore (exhaused all victims) and the partition is
+ // frame limit or 2) we could not find a victim anymore (exhausted all victims) and the partition is
// memory-resident with no frame.
flushBigObjectToDisk(pid, accessorBuild, tid, buildRFWriters, buildRelName);
spilledStatus.set(pid);
@@ -613,4 +620,14 @@
}
this.isReversed = reversed;
}
+
+ private void logTupleInsertionFailure(int tid, int pid, int numFrames, int partitionFrameLimit) {
+ int recordSize = VPartitionTupleBufferManager.calculateActualSize(null, accessorBuild.getTupleLength(tid));
+ String details = String.format(
+ "partition %s, tuple size %s, needed # frames %s, partition frame limit %s, join "
+ + "memory in frames %s, initial frame size %s",
+ pid, recordSize, numFrames, partitionFrameLimit, memSizeInFrames, jobletCtx.getInitialFrameSize());
+ LOGGER.debug("can't insert tuple in join memory. {}", details);
+ LOGGER.debug("partitions status:\n{}", spillPolicy.partitionsStatus());
+ }
}