[ASTERIXDB-2577][RT] One frame per spilled partition in optimized hybrid hash join

- user model changes: no
- storage format changes: no
- interface changes: no


Makes sure there is one frame for each spilled partition during the build close.
Checks the length of large record before flushing a record to the disk blindly
as being large.

Change-Id: I82d4da57e9a9835cab61dc5cc43c45c728e0c2b6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3485
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4578c2e..731e7ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -168,7 +168,7 @@
-    private static int calculateActualSize(int[] fieldEndOffsets, int size) {
+    public static int calculateActualSize(int[] fieldEndOffsets, int size) {
         if (fieldEndOffsets != null) {
             return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
@@ -204,7 +204,7 @@
     private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
             throws HyracksDataException {
-        assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
+        assert bufferInfo.getStartOffset() == 0 : "Haven't supported yet in FrameTupleAppender";
         if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
             appender.reset(appendFrame, false);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..7b6dcdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -62,46 +62,37 @@
-    private IHyracksTaskContext ctx;
+    private final IHyracksTaskContext ctx;
     private final String buildRelName;
     private final String probeRelName;
     private final ITuplePairComparator comparator;
     private final ITuplePartitionComputer buildHpc;
     private final ITuplePartitionComputer probeHpc;
     private final RecordDescriptor buildRd;
     private final RecordDescriptor probeRd;
-    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
-    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+    private final RunFileWriter[] buildRFWriters; //writing spilled build partitions
+    private final RunFileWriter[] probeRFWriters; //writing spilled probe partitions
     private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
     private final IMissingWriter[] nonMatchWriters;
     private final BitSet spilledStatus; //0=resident, 1=spilled
     private final int numOfPartitions;
     private final int memSizeInFrames;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
     private IPartitionedTupleBufferManager bufferManager;
     private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
-    private IDeallocatableFramePool framePool;
     private ISimpleFrameBufferManager bufferManagerForHashTable;
-    private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+    // Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal.
+    private boolean isReversed;
     // stats information
     private int[] buildPSizeInTups;
     private IFrame reloadBuffer;
-    private TuplePointer tempPtr = new TuplePointer(); // this is a reusable object to store the pointer,which is not used anywhere.
-                                                       // we mainly use it to match the corresponding function signature.
+    // this is a reusable object to store the pointer,which is not used anywhere. we mainly use it to match the
+    // corresponding function signature.
+    private final TuplePointer tempPtr = new TuplePointer();
     private int[] probePSizeInTups;
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
@@ -117,20 +108,15 @@
         this.comparator = comparator;
         this.buildRelName = buildRelName;
         this.probeRelName = probeRelName;
         this.numOfPartitions = numOfPartitions;
         this.buildRFWriters = new RunFileWriter[numOfPartitions];
         this.probeRFWriters = new RunFileWriter[numOfPartitions];
         this.accessorBuild = new FrameTupleAccessor(buildRd);
         this.accessorProbe = new FrameTupleAccessor(probeRd);
         this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         this.isReversed = false;
         this.spilledStatus = new BitSet(numOfPartitions);
         this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
             for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -140,7 +126,8 @@
     public void initBuild() throws HyracksDataException {
-        framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+        IDeallocatableFramePool framePool =
+                new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
         bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
         bufferManager = new VPartitionTupleBufferManager(
@@ -153,32 +140,27 @@
     public void build(ByteBuffer buffer) throws HyracksDataException {
         int tupleCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
-            processTuple(i, pid);
+            processTupleBuildPhase(i, pid);
-    private void processTuple(int tid, int pid) throws HyracksDataException {
+    private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
         while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-            selectAndSpillVictim(pid);
+            int victimPartition = spillPolicy.selectVictimPartition(pid);
+            if (victimPartition < 0) {
+                throw new HyracksDataException(
+                        "No more space left in the memory buffer, please assign more memory to hash-join.");
+            }
+            spillPartition(victimPartition);
-    private void selectAndSpillVictim(int pid) throws HyracksDataException {
-        int victimPartition = spillPolicy.selectVictimPartition(pid);
-        if (victimPartition < 0) {
-            throw new HyracksDataException(
-                    "No more space left in the memory buffer, please assign more memory to hash-join.");
-        }
-        spillPartition(victimPartition);
-    }
     private void spillPartition(int pid) throws HyracksDataException {
-        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(buildRFWriters, buildRelName, pid);
         bufferManager.flushPartition(pid, writer);
@@ -191,19 +173,8 @@
-    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        String refName = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                refName = buildRelName;
-                break;
-            case PROBE:
-                refName = probeRelName;
-                runFileWriters = probeRFWriters;
-                break;
-        }
+    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(RunFileWriter[] runFileWriters, String refName,
+            int pid) throws HyracksDataException {
         RunFileWriter writer = runFileWriters[pid];
         if (writer == null) {
             FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
@@ -216,54 +187,55 @@
     public void closeBuild() throws HyracksDataException {
         // Flushes the remaining chunks of the all spilled partitions to the disk.
-        closeAllSpilledPartitions(SIDE.BUILD);
+        closeAllSpilledPartitions(buildRFWriters, buildRelName);
         // Makes the space for the in-memory hash table (some partitions may need to be spilled to the disk
         // during this step in order to make the space.)
         // and tries to bring back as many spilled partitions as possible if there is free space.
         int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
-        createInMemoryJoiner(inMemTupCount);
+        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
+        this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
+                new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
+                predEvaluator, isReversed, bufferManagerForHashTable);
-        loadDataInMemJoin();
+        buildHashTable();
+    }
+    public void clearBuildTempFiles() throws HyracksDataException {
+        clearTempFiles(buildRFWriters);
+    }
+    public void clearProbeTempFiles() throws HyracksDataException {
+        clearTempFiles(probeRFWriters);
      * In case of failure happens, we need to clear up the generated temporary files.
-    public void clearBuildTempFiles() throws HyracksDataException {
-        for (int i = 0; i < buildRFWriters.length; i++) {
-            if (buildRFWriters[i] != null) {
-                buildRFWriters[i].erase();
+    private void clearTempFiles(RunFileWriter[] runFileWriters) throws HyracksDataException {
+        for (int i = 0; i < runFileWriters.length; i++) {
+            if (runFileWriters[i] != null) {
+                runFileWriters[i].erase();
-    private void closeAllSpilledPartitions(SIDE whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                break;
-            case PROBE:
-                runFileWriters = probeRFWriters;
-                break;
-        }
+    private void closeAllSpilledPartitions(RunFileWriter[] runFileWriters, String refName) throws HyracksDataException {
         try {
             for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
                     spilledStatus.nextSetBit(pid + 1)) {
                 if (bufferManager.getNumTuples(pid) > 0) {
-                    bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
+                    bufferManager.flushPartition(pid,
+                            getSpillWriterOrCreateNewOneIfNotExist(runFileWriters, refName, pid));
         } finally {
             // Force to close all run file writers.
-            if (runFileWriters != null) {
-                for (RunFileWriter runFileWriter : runFileWriters) {
-                    if (runFileWriter != null) {
-                        runFileWriter.close();
-                    }
+            for (RunFileWriter runFileWriter : runFileWriters) {
+                if (runFileWriter != null) {
+                    runFileWriter.close();
@@ -278,93 +250,68 @@
      * @throws HyracksDataException
     private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
-        // we need number of |spilledPartitions| buffers to store the probe data
         int frameSize = ctx.getInitialFrameSize();
         long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
-        // For partitions in main memory, we deduct their size from the free space.
         int inMemTupCount = 0;
         for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
                 spilledStatus.nextClearBit(p + 1)) {
             freeSpace -= bufferManager.getPhysicalSize(p);
             inMemTupCount += buildPSizeInTups[p];
+        freeSpace -= SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
-        // Calculates the expected hash table size for the given number of tuples in main memory
-        // and deducts it from the free space.
-        long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
-        freeSpace -= hashTableByteSizeForInMemTuples;
+        return spillAndReloadPartitions(frameSize, freeSpace, inMemTupCount);
+    }
-        // In the case where free space is less than zero after considering the hash table size,
-        // we need to spill more partitions until we can accommodate the hash table in memory.
-        // TODO: there may be different policies (keep spilling minimum, spilling maximum, find a similar size to the
-        //                                        hash table, or keep spilling from the first partition)
-        boolean moreSpilled = false;
-        // No space to accommodate the hash table? Then, we spill one or more partitions to the disk.
-        if (freeSpace < 0) {
-            // Tries to find a best-fit partition not to spill many partitions.
-            int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
+    private int spillAndReloadPartitions(int frameSize, long freeSpace, int inMemTupCount) throws HyracksDataException {
+        int pidToSpill, numberOfTuplesToBeSpilled;
+        long expectedHashTableSizeDecrease;
+        long currentFreeSpace = freeSpace;
+        int currentInMemTupCount = inMemTupCount;
+        // Spill some partitions if there is no free space.
+        while (currentFreeSpace < 0) {
+            pidToSpill = selectSinglePartitionToSpill(currentFreeSpace, currentInMemTupCount, frameSize);
             if (pidToSpill >= 0) {
-                // There is a suitable one. We spill that partition to the disk.
-                long hashTableSizeDecrease =
-                        -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
-                                -buildPSizeInTups[pidToSpill], frameSize);
-                freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
-                inMemTupCount -= buildPSizeInTups[pidToSpill];
+                numberOfTuplesToBeSpilled = buildPSizeInTups[pidToSpill];
+                expectedHashTableSizeDecrease =
+                        -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
+                                -numberOfTuplesToBeSpilled, frameSize);
+                currentInMemTupCount -= numberOfTuplesToBeSpilled;
+                currentFreeSpace +=
+                        bufferManager.getPhysicalSize(pidToSpill) + expectedHashTableSizeDecrease - frameSize;
-                moreSpilled = true;
             } else {
-                // There is no single suitable partition. So, we need to spill multiple partitions to the disk
-                // in order to accommodate the hash table.
-                for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
-                        spilledStatus.nextClearBit(p + 1)) {
-                    int spaceToBeReturned = bufferManager.getPhysicalSize(p);
-                    int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
-                    if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
-                        continue;
-                    }
-                    spillPartition(p);
-                    closeBuildPartition(p);
-                    moreSpilled = true;
-                    // Since the number of tuples in memory has been decreased,
-                    // the hash table size will be decreased, too.
-                    // We put minus since the method returns a negative value to represent a newly reclaimed space.
-                    long expectedHashTableSizeDecrease =
-                            -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
-                                    -numberOfTuplesToBeSpilled, frameSize);
-                    freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
-                    // Adjusts the hash table size
-                    inMemTupCount -= numberOfTuplesToBeSpilled;
-                    if (freeSpace >= 0) {
-                        break;
-                    }
-                }
+                throw new HyracksDataException("Hash join does not have enough memory even after spilling.");
+        // Bring some partitions back in if there is enough space.
+        return bringPartitionsBack(currentFreeSpace, currentInMemTupCount, frameSize);
+    }
-        // If more partitions have been spilled to the disk, calculate the expected hash table size again
-        // before bringing some partitions to main memory.
-        if (moreSpilled) {
-            hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
-        }
-        // Brings back some partitions if there is enough free space.
+    /**
+     * Brings back some partitions if there is free memory and partitions that fit in that space.
+     *
+     * @param freeSpace current amount of free space in memory
+     * @param inMemTupCount number of in memory tuples
+     * @return number of in memory tuples after bringing some (or none) partitions in memory.
+     * @throws HyracksDataException
+     */
+    private int bringPartitionsBack(long freeSpace, int inMemTupCount, int frameSize) throws HyracksDataException {
         int pid = 0;
-        while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) >= 0) {
-            if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
-                break;
-            }
-            long expectedHashTableByteSizeIncrease = SerializableHashTable
-                    .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize);
-            freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - expectedHashTableByteSizeIncrease;
-            inMemTupCount += buildPSizeInTups[pid];
-            // Adjusts the hash table size
-            hashTableByteSizeForInMemTuples += expectedHashTableByteSizeIncrease;
+        int currentMemoryTupleCount = inMemTupCount;
+        long currentFreeSpace = freeSpace;
+        while ((pid = selectAPartitionToReload(currentFreeSpace, pid, currentMemoryTupleCount)) >= 0
+                && loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+            currentMemoryTupleCount += buildPSizeInTups[pid];
+            // Reserve space for loaded data & increase in hash table (give back one frame taken by spilled partition.)
+            currentFreeSpace = currentFreeSpace
+                    - bufferManager.getPhysicalSize(pid) - SerializableHashTable
+                            .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize)
+                    + frameSize;
-        return inMemTupCount;
+        return currentMemoryTupleCount;
@@ -376,14 +323,18 @@
         long spaceAfterSpill;
         long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
         int minSpaceAfterSpillPartID = -1;
+        int nextAvailablePidToSpill = -1;
         for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
                 spilledStatus.nextClearBit(p + 1)) {
             if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
+            if (nextAvailablePidToSpill < 0) {
+                nextAvailablePidToSpill = p;
+            }
             // We put minus since the method returns a negative value to represent a newly reclaimed space.
-            spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) + (-SerializableHashTable
+            // One frame is deducted since a spilled partition needs one frame from free space.
+            spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) - frameSize + (-SerializableHashTable
                     .calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, -buildPSizeInTups[p], frameSize));
             if (spaceAfterSpill == 0) {
                 // Found the perfect one. Just returns this partition.
@@ -394,18 +345,30 @@
                 minSpaceAfterSpillPartID = p;
-        return minSpaceAfterSpillPartID;
+        return minSpaceAfterSpillPartID >= 0 ? minSpaceAfterSpillPartID : nextAvailablePidToSpill;
-    private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
-        for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
-                spilledStatus.nextSetBit(i + 1)) {
-            int spilledTupleCount = buildPSizeInTups[i];
-            // Expected hash table size increase after reloading this partition
-            long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
-                    inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
-            if (freeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
-                return i;
+    /**
+     * Finds a partition that can fit in the left over memory.
+     * @param freeSpace current free space
+     * @param inMemTupCount number of tuples currently in memory
+     * @return partition id of selected partition to reload
+     */
+    private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) {
+        int frameSize = ctx.getInitialFrameSize();
+        // Add one frame to freeSpace to consider the one frame reserved for the spilled partition
+        long totalFreeSpace = freeSpace + frameSize;
+        if (totalFreeSpace > 0) {
+            for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
+                    spilledStatus.nextSetBit(i + 1)) {
+                int spilledTupleCount = buildPSizeInTups[i];
+                // Expected hash table size increase after reloading this partition
+                long expectedHashTableByteSizeIncrease = SerializableHashTable
+                        .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, spilledTupleCount, frameSize);
+                if (totalFreeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
+                    return i;
+                }
         return -1;
@@ -421,13 +384,11 @@
             while (r.nextFrame(reloadBuffer)) {
                 for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
-                    if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-                        continue;
+                    if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+                        // for some reason (e.g. fragmentation) if inserting fails, we need to clear the occupied frames
+                        bufferManager.clearPartition(pid);
+                        return false;
-                    // for some reason (e.g. due to fragmentation) if the inserting failed,
-                    // we need to clear the occupied frames
-                    bufferManager.clearPartition(pid);
-                    return false;
             // Closes and deletes the run file if it is already loaded into memory.
@@ -440,21 +401,14 @@
         return true;
-    private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
-        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
-        this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
-                predEvaluator, isReversed, bufferManagerForHashTable);
-    }
-    private void loadDataInMemJoin() throws HyracksDataException {
+    private void buildHashTable() throws HyracksDataException {
         for (int pid = 0; pid < numOfPartitions; pid++) {
             if (!spilledStatus.get(pid)) {
                 bufferManager.flushPartition(pid, new IFrameWriter() {
-                    public void open() throws HyracksDataException {
+                    public void open() {
+                        // Only nextFrame method is needed to pass the frame to the next operator.
@@ -463,24 +417,21 @@
-                    public void fail() throws HyracksDataException {
+                    public void fail() {
+                        // Only nextFrame method is needed to pass the frame to the next operator.
-                    public void close() throws HyracksDataException {
+                    public void close() {
+                        // Only nextFrame method is needed to pass the frame to the next operator.
-    public void initProbe() throws HyracksDataException {
+    public void initProbe() {
         probePSizeInTups = new int[numOfPartitions];
-        probeRFWriters = new RunFileWriter[numOfPartitions];
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
@@ -497,19 +448,7 @@
             if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
                 if (spilledStatus.get(pid)) { //pid is Spilled
-                    while (!bufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
-                        int victim = pid;
-                        if (bufferManager.getNumTuples(pid) == 0) { // current pid is empty, choose the biggest one
-                            victim = spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
-                        }
-                        if (victim < 0) { // current tuple is too big for all the free space
-                            flushBigProbeObjectToDisk(pid, accessorProbe, i);
-                            break;
-                        }
-                        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
-                        bufferManager.flushPartition(victim, runFileWriter);
-                        bufferManager.clearPartition(victim);
-                    }
+                    processTupleProbePhase(i, pid);
                 } else { //pid is Resident
                     inMemJoiner.join(i, writer);
@@ -518,12 +457,44 @@
+    private void processTupleProbePhase(int tupleId, int pid) throws HyracksDataException {
+        if (!bufferManager.insertTuple(pid, accessorProbe, tupleId, tempPtr)) {
+            int recordSize =
+                    VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId));
+            // If the partition is at least half-full and insertion fails, that partition is preferred to get
+            // spilled, otherwise the biggest partition gets chosen as the victim.
+            boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2);
+            int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid
+                    : spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
+            // This method is called for the spilled partitions, so we know that this tuple is going to get written to
+            // disk, sooner or later. As such, we try to reduce the number of writes that happens. So if the record is
+            // larger than the size of the victim partition, we just flush it to the disk, otherwise we spill the
+            // victim and this time insertion should be successful.
+            //TODO:(More optimization) There might be a case where the leftover memory in the last frame of the
+            // current partition + free memory in buffer manager + memory from the victim would be sufficient to hold
+            // the record.
+            if (victim >= 0 && bufferManager.getPhysicalSize(victim) >= recordSize) {
+                RunFileWriter runFileWriter =
+                        getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, victim);
+                bufferManager.flushPartition(victim, runFileWriter);
+                bufferManager.clearPartition(victim);
+                if (!bufferManager.insertTuple(pid, accessorProbe, tupleId, tempPtr)) {
+                    // This should not happen if the size calculations are correct, just not to let the query fail.
+                    flushBigProbeObjectToDisk(pid, accessorProbe, tupleId);
+                }
+            } else {
+                flushBigProbeObjectToDisk(pid, accessorProbe, tupleId);
+            }
+        }
+    }
     private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
             throws HyracksDataException {
         if (bigProbeFrameAppender == null) {
             bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
-        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid);
         if (!bigProbeFrameAppender.append(accessorProbe, i)) {
             throw new HyracksDataException("The given tuple is too big");
@@ -542,55 +513,42 @@
     public void releaseResource() throws HyracksDataException {
-        closeAllSpilledPartitions(SIDE.PROBE);
+        closeAllSpilledPartitions(probeRFWriters, probeRelName);
         inMemJoiner = null;
         bufferManager = null;
         bufferManagerForHashTable = null;
-    /**
-     * In case of failure happens, we need to clear up the generated temporary files.
-     */
-    public void clearProbeTempFiles() throws HyracksDataException {
-        for (int i = 0; i < probeRFWriters.length; i++) {
-            if (probeRFWriters[i] != null) {
-                probeRFWriters[i].erase();
-            }
-        }
-    }
     public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
-        return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
+        return buildRFWriters[pid] == null ? null : buildRFWriters[pid].createDeleteOnCloseReader();
     public int getBuildPartitionSizeInTup(int pid) {
-        return (buildPSizeInTups[pid]);
+        return buildPSizeInTups[pid];
     public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
-        return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
+        return probeRFWriters[pid] == null ? null : probeRFWriters[pid].createDeleteOnCloseReader();
     public int getProbePartitionSizeInTup(int pid) {
-        return (probePSizeInTups[pid]);
+        return probePSizeInTups[pid];
     public int getMaxBuildPartitionSize() {
-        int max = buildPSizeInTups[0];
-        for (int i = 1; i < buildPSizeInTups.length; i++) {
-            if (buildPSizeInTups[i] > max) {
-                max = buildPSizeInTups[i];
-            }
-        }
-        return max;
+        return getMaxPartitionSize(buildPSizeInTups);
     public int getMaxProbePartitionSize() {
-        int max = probePSizeInTups[0];
-        for (int i = 1; i < probePSizeInTups.length; i++) {
-            if (probePSizeInTups[i] > max) {
-                max = probePSizeInTups[i];
+        return getMaxPartitionSize(probePSizeInTups);
+    }
+    private int getMaxPartitionSize(int[] partitions) {
+        int max = partitions[0];
+        for (int i = 1; i < partitions.length; i++) {
+            if (partitions[i] > max) {
+                max = partitions[i];
         return max;
@@ -600,6 +558,10 @@
         return spilledStatus;
+    public int getPartitionSize(int pid) {
+        return bufferManager.getPhysicalSize(pid);
+    }
     public void setIsReversed(boolean b) {
         this.isReversed = b;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index e6da7c9..71e5fd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -48,15 +48,16 @@
             ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold) throws HyracksDataException {
         super(tableSize, ctx, false);
         this.bufferManager = bufferManager;
-        ByteBuffer newFrame = getFrame(frameSize);
-        if (newFrame == null) {
-            throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+        if (tableSize > 0) {
+            ByteBuffer newFrame = getFrame(frameSize);
+            if (newFrame == null) {
+                throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+            }
+            IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+            frameCapacity = frame.capacity();
+            contents.add(frame);
+            currentOffsetInEachFrameList.add(0);
-        IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
-        frameCapacity = frame.capacity();
-        contents.add(frame);
-        currentOffsetInEachFrameList.add(0);
         this.garbageCollectionThreshold = garbageCollectionThreshold;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
new file mode 100644
index 0000000..4c6b70f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -0,0 +1,240 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Random;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoin;
+import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+public class OptimizedHybridHashJoinTest {
+    int frameSize = 32768;
+    int totalNumberOfFrames = 10;
+    IHyracksTaskContext ctx = TestUtils.create(frameSize);
+    OptimizedHybridHashJoin hhj;
+    static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
+    static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
+    static String probeRelName = "RelR";
+    static String buildRelName = "RelS";
+    static ITuplePairComparator comparator;
+    static RecordDescriptor probeRd;
+    static RecordDescriptor buildRd;
+    static ITuplePartitionComputer probeHpc;
+    static ITuplePartitionComputer buildHpc;
+    static IPredicateEvaluator predEval;
+    int memSizeInFrames = -1;
+    int numOfPartitions = -1;
+    boolean isLeftOuter = false;
+    static int[] probeKeys = { 0 };
+    static int[] buildKeys = { 0 };
+    private final Random rnd = new Random(50);
+    @BeforeClass
+    public static void classSetUp() {
+        comparator = Mockito.mock(ITuplePairComparator.class);
+        probeHpc = new FieldHashPartitionComputerFamily(probeKeys, propHashFunctionFactories).createPartitioner(0);
+        buildHpc = new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories).createPartitioner(0);
+    }
+    @Test
+    public void SmallRecords_AllRelationsInMemory() throws HyracksDataException {
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        generateIntFrame(frame);
+        probeRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        buildRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        memSizeInFrames = 55;
+        numOfPartitions = 5;
+        testJoin(memSizeInFrames, numOfPartitions, frame);
+    }
+    @Test
+    public void SmallRecords_SomeRelationsInMemory() throws HyracksDataException {
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        generateIntFrame(frame);
+        probeRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        buildRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        memSizeInFrames = 40;
+        numOfPartitions = 5;
+        testJoin(memSizeInFrames, numOfPartitions, frame);
+    }
+    @Test
+    public void SmallRecords_AllRelationsSpill() throws HyracksDataException {
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        generateIntFrame(frame);
+        probeRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        buildRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        memSizeInFrames = 5;
+        numOfPartitions = 5;
+        testJoin(memSizeInFrames, numOfPartitions, frame);
+    }
+    @Test
+    public void LargeRecords_AllRelationsInMemory() throws HyracksDataException {
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        generateStringFrame(frame, ctx.getInitialFrameSize() * 3);
+        probeRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        buildRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        memSizeInFrames = 55;
+        numOfPartitions = 5;
+        testJoin(memSizeInFrames, numOfPartitions, frame);
+    }
+    @Test
+    public void LargeRecords_SomeRelationsInMemory() throws HyracksDataException {
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        generateStringFrame(frame, ctx.getInitialFrameSize() * 3);
+        probeRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        buildRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        memSizeInFrames = 40;
+        numOfPartitions = 5;
+        testJoin(memSizeInFrames, numOfPartitions, frame);
+    }
+    @Test
+    public void LargeRecords_AllRelationsSpill() throws HyracksDataException {
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        generateStringFrame(frame, ctx.getInitialFrameSize() * 3);
+        probeRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        buildRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        memSizeInFrames = 5;
+        numOfPartitions = 5;
+        testJoin(memSizeInFrames, numOfPartitions, frame);
+    }
+    private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
+        hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator,
+                probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+        hhj.initBuild();
+        for (int i = 0; i < totalNumberOfFrames; i++) {
+            hhj.build(frame.getBuffer());
+        }
+        hhj.closeBuild();
+        checkOneFrameReservedPerSpilledPartitions();
+        IFrameWriter writer = new IFrameWriter() {
+            @Override
+            public void open() throws HyracksDataException {
+                //Not implemented as this method is only used to pass the frames of the given partition
+                //to the in memory joiner. As such, only next frame is important.
+            }
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            }
+            @Override
+            public void fail() throws HyracksDataException {
+                //Not implemented as this method is only used to pass the frames of the given partition
+                //to the in memory joiner. As such, only next frame is important.
+            }
+            @Override
+            public void close() throws HyracksDataException {
+                //Not implemented as this method is only used to pass the frames of the given partition
+                //to the in memory joiner. As such, only next frame is important.
+            }
+        };
+        hhj.initProbe();
+        for (int i = 0; i < totalNumberOfFrames; i++) {
+            hhj.probe(frame.getBuffer(), writer);
+            checkOneFrameReservedPerSpilledPartitions();
+        }
+    }
+    private void checkOneFrameReservedPerSpilledPartitions() throws HyracksDataException {
+        //Make sure that there is one frame reserved for each spilled partition.
+        int frameSize = ctx.getInitialFrameSize();
+        int totalSize = 0;
+        int inMemTuples = 0;
+        BitSet spilledStatus = hhj.getPartitionStatus();
+        for (int i = spilledStatus.nextClearBit(0); i >= 0 && i < numOfPartitions; i =
+                spilledStatus.nextClearBit(i + 1)) {
+            totalSize += hhj.getPartitionSize(i);
+            inMemTuples += hhj.getBuildPartitionSizeInTup(i);
+        }
+        if (memSizeInFrames * frameSize - totalSize
+                - SerializableHashTable.getExpectedTableByteSize(inMemTuples, frameSize) < spilledStatus.cardinality()
+                        * frameSize) {
+            throw new HyracksDataException("There should be at least one frame reserved for each spilled partition.");
+        }
+    }
+    private void generateIntFrame(VSizeFrame frame) throws HyracksDataException {
+        int fieldCount = 1;
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+        while (appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            TupleUtils.createIntegerTuple(tb, tuple, rnd.nextInt());
+            tuple.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+        }
+    }
+    private void generateStringFrame(VSizeFrame frame, int length) throws HyracksDataException {
+        int fieldCount = 1;
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
+        ArrayTupleReference tuple = new ArrayTupleReference();
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+        ISerializerDeserializer[] fieldSerdes = { new UTF8StringSerializerDeserializer() };
+        String data = "";
+        for (int i = 0; i < length; i++) {
+            data += "X";
+        }
+        TupleUtils.createTuple(tb, tuple, fieldSerdes, data);
+        tuple.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+        appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+    }