Addressing Review Comments for the Hybrid Hash Join

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@764 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 375f45e..8f2bb81 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -1,6 +1,5 @@
 package edu.uci.ics.hyracks.dataflow.std.join;
 
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -23,119 +22,118 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
+/**
+ * @author pouria
+       This class mainly applies one level of HHJ on a pair of
+       relations. It is always called by the descriptor.
+ */
 public class OptimizedHybridHashJoin {
-	
-	private final int NO_MORE_FREE_BUFFER = -1;
+
+    private final int NO_MORE_FREE_BUFFER = -1;
     private final int END_OF_PARTITION = -1;
     private final int INVALID_BUFFER = -2;
     private final int UNALLOCATED_FRAME = -3;
     private final int BUFFER_FOR_RESIDENT_PARTS = -1;
-	
-    IHyracksTaskContext ctx;
-	
-	private final String rel0Name;
-	private final String rel1Name;
-	
-	private final int[] buildKeys;
+
+    private IHyracksTaskContext ctx;
+
+    private final String rel0Name;
+    private final String rel1Name;
+
+    private final int[] buildKeys;
     private final int[] probeKeys;
-	
-    final IBinaryComparator[] comparators;
-    
-	private ITuplePartitionComputer buildHpc;
-	private ITuplePartitionComputer probeHpc;
-	 
-	final RecordDescriptor buildRd;
-    final RecordDescriptor probeRd;
-    
-    private RunFileWriter[] buildRFWriters;			//writing spilled build partitions
-    private RunFileWriter[] probeRFWriters;			//writing spilled probe partitions
-    
+
+    private final IBinaryComparator[] comparators;
+
+    private ITuplePartitionComputer buildHpc;
+    private ITuplePartitionComputer probeHpc;
+
+    private final RecordDescriptor buildRd;
+    private final RecordDescriptor probeRd;
+
+    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+
     private final boolean isLeftOuter;
     private final INullWriter[] nullWriters1;
-    
-    private ByteBuffer[] memBuffs;			//Memory buffers for build
-    private int[] curPBuff;    				//Current (last) Buffer for each partition
-    private int[] nextBuff;					//Next buffer in the partition's buffer chain
-    private int[] buildPSizeInTups;			//Size of build partitions (in tuples)
-    private int[] probePSizeInTups;			//Size of probe partitions (in tuples)
-    private int nextFreeBuffIx;				//Index of next available free buffer to allocate/use
-    private BitSet pStatus;     			//0=resident, 1=spilled
-    private int numOfPartitions;			
+
+    private ByteBuffer[] memBuffs; //Memory buffers for build
+    private int[] curPBuff; //Current (last) Buffer for each partition
+    private int[] nextBuff; //Next buffer in the partition's buffer chain
+    private int[] buildPSizeInTups; //Size of build partitions (in tuples)
+    private int[] probePSizeInTups; //Size of probe partitions (in tuples)
+    private int nextFreeBuffIx; //Index of next available free buffer to allocate/use
+    private BitSet pStatus; //0=resident, 1=spilled
+    private int numOfPartitions;
     private int memForJoin;
-    private InMemoryHashJoin inMemJoiner;	//Used for joining resident partitions
-    
+    private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
+
     private final FrameTupleAccessor accessorBuild;
-	private final FrameTupleAccessor accessorProbe;
+    private final FrameTupleAccessor accessorProbe;
     private FrameTupleAppender buildTupAppender;
     private FrameTupleAppender probeTupAppenderToResident;
     private FrameTupleAppender probeTupAppenderToSpilled;
-    
+
     private int numOfSpilledParts;
-    private ByteBuffer[] sPartBuffs;	//Buffers for probe spilled partitions (one buffer per spilled partition)
-    private ByteBuffer probeResBuff;	//Buffer for probe resident partition tuples
-    
-    
-    private int[] buildPSizeInFrames;	//Used for partition tuning
-    private int freeFramesCounter;		//Used for partition tuning
-   
-    
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin,
-    								int numOfPartitions, String rel0Name, String rel1Name,
-    									int[] keys0, int[] keys1, IBinaryComparator[] comparators,
-    										RecordDescriptor buildRd, RecordDescriptor probeRd,
-    											ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc){
-    	this.ctx = ctx;
-    	this.memForJoin = memForJoin;
-    	this.buildRd = buildRd;
-    	this.probeRd = probeRd;
-    	this.buildHpc = probeHpc;
-    	this.probeHpc = buildHpc;
-    	this.buildKeys = keys0;
-    	this.probeKeys = keys1;
-    	this.comparators = comparators;
-    	this.rel0Name = rel0Name;
-    	this.rel1Name = rel1Name;
+    private ByteBuffer[] sPartBuffs;    //Buffers for probe spilled partitions (one buffer per spilled partition)
+    private ByteBuffer probeResBuff;    //Buffer for probe resident partition tuples
+    private ByteBuffer reloadBuffer;    //Buffer for reloading spilled partitions during partition tuning 
 
-    	this.numOfPartitions = numOfPartitions;
-    	this.buildRFWriters = new RunFileWriter[numOfPartitions];
-    	this.probeRFWriters = new RunFileWriter[numOfPartitions];
+    private int[] buildPSizeInFrames; //Used for partition tuning
+    private int freeFramesCounter; //Used for partition tuning
 
-    	this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
-    	this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
+            String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
+            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc) {
+        this.ctx = ctx;
+        this.memForJoin = memForJoin;
+        this.buildRd = buildRd;
+        this.probeRd = probeRd;
+        this.buildHpc = probeHpc;
+        this.probeHpc = buildHpc;
+        this.buildKeys = keys0;
+        this.probeKeys = keys1;
+        this.comparators = comparators;
+        this.rel0Name = rel0Name;
+        this.rel1Name = rel1Name;
 
-    	this.isLeftOuter = false;
-    	this.nullWriters1 = null;
-    	
+        this.numOfPartitions = numOfPartitions;
+        this.buildRFWriters = new RunFileWriter[numOfPartitions];
+        this.probeRFWriters = new RunFileWriter[numOfPartitions];
+
+        this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
+        this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+
+        this.isLeftOuter = false;
+        this.nullWriters1 = null;
+
     }
-    
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin,
-    								int numOfPartitions, String rel0Name, String rel1Name,
-    									int[] keys0, int[] keys1, IBinaryComparator[] comparators,
-    										RecordDescriptor buildRd, RecordDescriptor probeRd,
-    											ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-    												boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1){
-    	this.ctx = ctx;
-    	this.memForJoin = memForJoin;
-    	this.buildRd = buildRd;
-    	this.probeRd = probeRd;
-    	this.buildHpc = probeHpc;
-    	this.probeHpc = buildHpc;
-    	this.buildKeys = keys0;
-    	this.probeKeys = keys1;
-    	this.comparators = comparators;
-    	this.rel0Name = rel0Name;
-    	this.rel1Name = rel1Name;
 
-    	this.numOfPartitions = numOfPartitions;
-    	this.buildRFWriters = new RunFileWriter[numOfPartitions];
-    	this.probeRFWriters = new RunFileWriter[numOfPartitions];
+    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
+            String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
+            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
+            boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+        this.ctx = ctx;
+        this.memForJoin = memForJoin;
+        this.buildRd = buildRd;
+        this.probeRd = probeRd;
+        this.buildHpc = probeHpc;
+        this.probeHpc = buildHpc;
+        this.buildKeys = keys0;
+        this.probeKeys = keys1;
+        this.comparators = comparators;
+        this.rel0Name = rel0Name;
+        this.rel1Name = rel1Name;
 
-    	this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
-    	this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+        this.numOfPartitions = numOfPartitions;
+        this.buildRFWriters = new RunFileWriter[numOfPartitions];
+        this.probeRFWriters = new RunFileWriter[numOfPartitions];
 
-    	this.isLeftOuter = isLeftOuter;
-        
-        
+        this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
+        this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+
+        this.isLeftOuter = isLeftOuter;
+
         this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
             for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -143,160 +141,148 @@
             }
         }
     }
-    
-    
-    public void initBuild(){
-    	memBuffs = new ByteBuffer[memForJoin];
+
+    public void initBuild() {
+        memBuffs = new ByteBuffer[memForJoin];
         curPBuff = new int[numOfPartitions];
         nextBuff = new int[memForJoin];
         pStatus = new BitSet(numOfPartitions);
         buildPSizeInTups = new int[numOfPartitions];
-        
+
         buildPSizeInFrames = new int[numOfPartitions];
         freeFramesCounter = memForJoin - numOfPartitions;
-        
-        for(int i=0; i<numOfPartitions; i++){		//Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
+
+        for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
             memBuffs[i] = ctx.allocateFrame();
             curPBuff[i] = i;
             nextBuff[i] = -1;
-            buildPSizeInFrames[i] = 1;				//The dedicated initial buffer
+            buildPSizeInFrames[i] = 1; //The dedicated initial buffer
         }
-        											
-        nextFreeBuffIx = ( (numOfPartitions<memForJoin) ? numOfPartitions : -1);	//Setting the chain of unallocated frames
-        for(int i=numOfPartitions; i<memBuffs.length; i++){
-     	   nextBuff[i] = UNALLOCATED_FRAME;
+
+        nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
+        for (int i = numOfPartitions; i < memBuffs.length; i++) {
+            nextBuff[i] = UNALLOCATED_FRAME;
         }
-        
+
         buildTupAppender = new FrameTupleAppender(ctx.getFrameSize());
-        
-        
+
     }
-    
+
     public void build(ByteBuffer buffer) throws HyracksDataException {
-    	accessorBuild.reset(buffer);
+        accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
             processTuple(i, pid);
             buildPSizeInTups[pid]++;
         }
-        
+
     }
-    
-    private void processTuple(int tid, int pid) throws HyracksDataException{
-        ByteBuffer partition = memBuffs[ curPBuff[pid] ];				//Getting current buffer for the target partition
-        
-        if(!pStatus.get(pid)){  										//resident partition
-        	buildTupAppender.reset(partition, false);
+
+    private void processTuple(int tid, int pid) throws HyracksDataException {
+        ByteBuffer partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
+
+        if (!pStatus.get(pid)) { //resident partition
+            buildTupAppender.reset(partition, false);
             while (true) {
-                if (!buildTupAppender.append(accessorBuild, tid)) {  	//partition does not have enough room
-                   int newBuffIx = allocateFreeBuffer(pid);
-                   if(newBuffIx == NO_MORE_FREE_BUFFER){    			//Spill one partition
-                       int pidToSpill = selectPartitionToSpill();
-                       if(pidToSpill != -1){
-                           spillPartition(pidToSpill);
-                           buildTupAppender.reset(memBuffs[pidToSpill], true);	
-                           processTuple(tid, pid);
-                           break;
-                       }
-                       else{    										//No more partition to spill
-                           throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
-                       }
-                   }
-                   else{    											//New Buffer allocated successfully
-                       partition = memBuffs[ curPBuff[pid] ];
-                       buildTupAppender.reset(partition, true);
-                       if (!buildTupAppender.append(accessorBuild, tid)){
-                           throw new HyracksDataException("Invalid State (Can not append to newly allocated buffer)");
-                       }
-                       else{
-                    	   buildPSizeInFrames[pid]++;
-                           break;
-                       }
-                   }
-                } 
-                else {  											//Tuple added to resident partition successfully
-                	break;
+                if (buildTupAppender.append(accessorBuild, tid)) { //Tuple added to resident partition successfully
+                    break;
                 }
+                //partition does not have enough room
+                int newBuffIx = allocateFreeBuffer(pid);
+                if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
+                    int pidToSpill = selectPartitionToSpill();
+                    if (pidToSpill == -1) { //No more partition to spill
+                        throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
+                    }
+                    spillPartition(pidToSpill);
+                    buildTupAppender.reset(memBuffs[pidToSpill], true);
+                    processTuple(tid, pid);
+                    break;
+                }  //New Buffer allocated successfully
+                partition = memBuffs[curPBuff[pid]]; //Current Buffer for the partition is now updated by allocateFreeBuffer() call above
+                buildTupAppender.reset(partition, true);
+                if (!buildTupAppender.append(accessorBuild, tid)) {
+                    throw new HyracksDataException("Invalid State (Can not append to newly allocated buffer)");
+                }
+                buildPSizeInFrames[pid]++;
+                break;
             }
-        }
-        else{   													//spilled partition
+        } else { //spilled partition
             boolean needClear = false;
             while (true) {
-            	buildTupAppender.reset(partition, needClear);
+                buildTupAppender.reset(partition, needClear);
                 if (buildTupAppender.append(accessorBuild, tid)) {
                     break;
-                } else {
-                	buildWrite(pid, partition);
-                    partition.clear();
-                    needClear = true;
-                    buildPSizeInFrames[pid]++;
                 }
+                //Dedicated in-memory buffer for the partition is full, needed to be flushed first 
+                buildWrite(pid, partition);
+                partition.clear();
+                needClear = true;
+                buildPSizeInFrames[pid]++;
             }
         }
     }
-    
+
     private int allocateFreeBuffer(int pid) {
-        if(nextFreeBuffIx != -1){
-            if( memBuffs[nextFreeBuffIx] == null){
+        if (nextFreeBuffIx != NO_MORE_FREE_BUFFER) {
+            if (memBuffs[nextFreeBuffIx] == null) {
                 memBuffs[nextFreeBuffIx] = ctx.allocateFrame();
             }
             int curPartBuffIx = curPBuff[pid];
             curPBuff[pid] = nextFreeBuffIx;
             int oldNext = nextBuff[nextFreeBuffIx];
             nextBuff[nextFreeBuffIx] = curPartBuffIx;
-            if(oldNext == UNALLOCATED_FRAME){
+            if (oldNext == UNALLOCATED_FRAME) {
                 nextFreeBuffIx++;
-                if(nextFreeBuffIx == memForJoin){   				//No more free buffer
+                if (nextFreeBuffIx == memForJoin) { //No more free buffer
                     nextFreeBuffIx = NO_MORE_FREE_BUFFER;
                 }
-            }
-            else{
+            } else {
                 nextFreeBuffIx = oldNext;
             }
-            (memBuffs[ curPBuff[pid] ]).clear();
-            
+            (memBuffs[curPBuff[pid]]).clear();
+
             freeFramesCounter--;
             return (curPBuff[pid]);
-        }
-        else{
-           return NO_MORE_FREE_BUFFER;   							//A partitions needs to be spilled (if feasible)
+        } else {
+            return NO_MORE_FREE_BUFFER; //A partitions needs to be spilled (if feasible)
         }
     }
-    
-    private int selectPartitionToSpill(){
+
+    private int selectPartitionToSpill() {
         int maxSize = -1;
         int partitionToSpill = -1;
-        for(int i=0; i<buildPSizeInTups.length; i++){				//Find the largest partition, to spill
-            if( !pStatus.get(i) && (buildPSizeInTups[i]>maxSize) ){	
+        for (int i = 0; i < buildPSizeInTups.length; i++) { //Find the largest partition, to spill
+            if (!pStatus.get(i) && (buildPSizeInTups[i] > maxSize)) {
                 maxSize = buildPSizeInTups[i];
                 partitionToSpill = i;
             }
         }
         return partitionToSpill;
     }
-    
-    private void spillPartition(int pid) throws HyracksDataException{
-    	int curBuffIx = curPBuff[pid];
-    	ByteBuffer buff = null;		
-    	while(curBuffIx != END_OF_PARTITION){
+
+    private void spillPartition(int pid) throws HyracksDataException {
+        int curBuffIx = curPBuff[pid];
+        ByteBuffer buff = null;
+        while (curBuffIx != END_OF_PARTITION) {
             buff = memBuffs[curBuffIx];
             buildWrite(pid, buff);
             buff.clear();
-            
+
             int freedBuffIx = curBuffIx;
             curBuffIx = nextBuff[curBuffIx];
-            
-            if(freedBuffIx != pid){
-            	nextBuff[freedBuffIx] = nextFreeBuffIx;
-            	nextFreeBuffIx = freedBuffIx;
-            	freeFramesCounter++;
+
+            if (freedBuffIx != pid) {
+                nextBuff[freedBuffIx] = nextFreeBuffIx;
+                nextFreeBuffIx = freedBuffIx;
+                freeFramesCounter++;
             }
         }
-    	curPBuff[pid] = pid;	
-    	pStatus.set(pid);
+        curPBuff[pid] = pid;
+        pStatus.set(pid);
     }
-    
+
     private void buildWrite(int pid, ByteBuffer buff) throws HyracksDataException {
         RunFileWriter writer = buildRFWriters[pid];
         if (writer == null) {
@@ -307,348 +293,315 @@
         }
         writer.nextFrame(buff);
     }
-    
-    public void closeBuild() throws HyracksDataException{
-    	for(int i=0; i<numOfPartitions; i++){									//Remove Empty Partitions' allocated frame
-    		if(buildPSizeInTups[i] == 0){
-    			buildPSizeInFrames[i]--;
-    			nextBuff[ curPBuff[i] ] = nextFreeBuffIx;
-    			nextFreeBuffIx = curPBuff[i];
-    			curPBuff[i] = INVALID_BUFFER;
-    			freeFramesCounter++;
-    		}
-    	}
-    	
-    	ByteBuffer buff = null;
-    	for(int i=pStatus.nextSetBit(0); i>=0; i=pStatus.nextSetBit(i+1)){		//flushing and DeAllocating the dedicated buffers for the spilled partitions
-    		buff = memBuffs[i];
-    		accessorBuild.reset(buff);
-    		if (accessorBuild.getTupleCount() > 0) {
-    			buildWrite(i, buff);
-    			buildPSizeInFrames[i]++;
-    		}
-    		nextBuff[i] = nextFreeBuffIx;
-    		nextFreeBuffIx = i;
-    		freeFramesCounter++;
-    		curPBuff[i] = INVALID_BUFFER;
-    		
-    		if( buildRFWriters[i] != null){
-    			buildRFWriters[i].close();
-    		}
-    	}
-    	
-    	partitionTune();														//Trying to bring back as many spilled partitions as possible, making them resident
-    	
-    	int inMemTupCount = 0;
-    	numOfSpilledParts = 0;
-    	
-    	for(int i=0; i<numOfPartitions; i++){			
-    		if(!pStatus.get(i)){
-    			inMemTupCount += buildPSizeInTups[i];
-    		}
-    		else{
-    			numOfSpilledParts++;
-    		}
-    	}
-    	
-    	
-    	createInMemoryJoiner(inMemTupCount);
-    	cacheInMemJoin();
-    	
-    	int _stats_res_empty = 0;
-    	int _stats_res_full = 0;
-    	int _stats_spilled = 0;
-    	int _stats_tups_in_mem = 0;
-    	int _stats_sum_spilled_size = 0;
-    	
-    	for(int i=0; i<numOfPartitions; i++){
-    		if(!pStatus.get(i)){				//is resident
-    			if(buildPSizeInTups[i] == 0){	//is empty
-    				_stats_res_empty++;
-    			}
-    			else{
-    				_stats_res_full++;
-    			}
-    			_stats_tups_in_mem += buildPSizeInTups[i];
-    		}
-    		else{
-    			_stats_spilled++;
-    			_stats_sum_spilled_size += buildPSizeInTups[i];
-    		}
-    	}
+
+    public void closeBuild() throws HyracksDataException {
+        for (int i = 0; i < numOfPartitions; i++) { //Remove Empty Partitions' allocated frame
+            if (buildPSizeInTups[i] == 0) {
+                buildPSizeInFrames[i]--;
+                nextBuff[curPBuff[i]] = nextFreeBuffIx;
+                nextFreeBuffIx = curPBuff[i];
+                curPBuff[i] = INVALID_BUFFER;
+                freeFramesCounter++;
+            }
+        }
+
+        ByteBuffer buff = null;
+        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
+            buff = memBuffs[i];
+            accessorBuild.reset(buff);
+            if (accessorBuild.getTupleCount() > 0) {
+                buildWrite(i, buff);
+                buildPSizeInFrames[i]++;
+            }
+            nextBuff[i] = nextFreeBuffIx;
+            nextFreeBuffIx = i;
+            freeFramesCounter++;
+            curPBuff[i] = INVALID_BUFFER;
+
+            if (buildRFWriters[i] != null) {
+                buildRFWriters[i].close();
+            }
+        }
+
+        partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
+
+        int inMemTupCount = 0;
+        numOfSpilledParts = 0;
+
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (!pStatus.get(i)) {
+                inMemTupCount += buildPSizeInTups[i];
+            } else {
+                numOfSpilledParts++;
+            }
+        }
+
+        createInMemoryJoiner(inMemTupCount);
+        cacheInMemJoin();
     }
-    
+
     private void partitionTune() throws HyracksDataException {
-    	ArrayList<Integer> reloadSet = selectPartitionsToReload();
-    	
-    	for(int i=0; i<reloadSet.size(); i++){
-    		int pid = reloadSet.get(i);
-    		if(buildPSizeInFrames[pid]>0){
-    			int[] buffsToLoad = new int[ buildPSizeInFrames[pid] ];
-    			for(int j=0; j<buffsToLoad.length; j++){
-    				buffsToLoad[j] = nextFreeBuffIx;
-    				int oldNext = nextBuff[nextFreeBuffIx];
-    				if(oldNext == UNALLOCATED_FRAME){
-                        nextFreeBuffIx++;
-                        if(nextFreeBuffIx == memForJoin){   				//No more free buffer
-                            nextFreeBuffIx = NO_MORE_FREE_BUFFER;
-                        }
+        reloadBuffer = ctx.allocateFrame();
+        ArrayList<Integer> reloadSet = selectPartitionsToReload();
+        for (int i = 0; i < reloadSet.size(); i++) {
+            int pid = reloadSet.get(i);
+            int[] buffsToLoad = new int[buildPSizeInFrames[pid]];
+            for (int j = 0; j < buffsToLoad.length; j++) {
+                buffsToLoad[j] = nextFreeBuffIx;
+                int oldNext = nextBuff[nextFreeBuffIx];
+                if (oldNext == UNALLOCATED_FRAME) {
+                    nextFreeBuffIx++;
+                    if (nextFreeBuffIx == memForJoin) { //No more free buffer
+                        nextFreeBuffIx = NO_MORE_FREE_BUFFER;
                     }
-                    else{
-                        nextFreeBuffIx = oldNext;
-                    }
-    				
-    				
-    			}
-    			curPBuff[pid] = buffsToLoad[0];
-    			for(int k=1; k<buffsToLoad.length; k++){
-    				nextBuff[ buffsToLoad[k-1] ] = buffsToLoad[k];
-    			}
-    			loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
-    		}
-    	}
-    	reloadSet.clear();
-    	reloadSet = null;
+                } else {
+                    nextFreeBuffIx = oldNext;
+                }
+
+            }
+            curPBuff[pid] = buffsToLoad[0];
+            for (int k = 1; k < buffsToLoad.length; k++) {
+                nextBuff[buffsToLoad[k - 1]] = buffsToLoad[k];
+            }
+            loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
+        }
+        reloadSet.clear();
+        reloadSet = null;
     }
-    
+
     private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
-    	RunFileReader r = wr.createReader();
+        RunFileReader r = wr.createReader();
         r.open();
-        ByteBuffer bf = ctx.allocateFrame();
         int counter = 0;
         ByteBuffer mBuff = null;
-        while (r.nextFrame(bf)) {
-        	mBuff = memBuffs[ buffs[counter] ];
-        	if(mBuff == null){
-        		mBuff = ctx.allocateFrame();
-        		memBuffs[ buffs[counter] ] = mBuff;
-        	}
-        	FrameUtils.copy(bf, mBuff );
-        	counter++;
+        reloadBuffer.clear();
+        while (r.nextFrame(reloadBuffer)) {
+            mBuff = memBuffs[buffs[counter]];
+            if (mBuff == null) {
+                mBuff = ctx.allocateFrame();
+                memBuffs[buffs[counter]] = mBuff;
+            }
+            FrameUtils.copy(reloadBuffer, mBuff);
+            counter++;
+            reloadBuffer.clear();
         }
-        
-        int curNext = nextBuff[ buffs[ buffs.length-1 ] ];
-        nextBuff[ buffs[ buffs.length-1 ] ] = END_OF_PARTITION;
+
+        int curNext = nextBuff[buffs[buffs.length - 1]];
+        nextBuff[buffs[buffs.length - 1]] = END_OF_PARTITION;
         nextFreeBuffIx = curNext;
-        
+
         r.close();
         pStatus.set(pid, false);
-        buildRFWriters[pid] = null;   
+        buildRFWriters[pid] = null;
     }
-    
-    private ArrayList<Integer> selectPartitionsToReload(){
-    	ArrayList<Integer> p = new ArrayList<Integer>();
-    	for(int i=pStatus.nextSetBit(0); i>=0; i=pStatus.nextSetBit(i+1)){
-    		if(freeFramesCounter - buildPSizeInFrames[i] >= 0){
-    			p.add(i);
-    			freeFramesCounter -= buildPSizeInFrames[i];
-    		}
-    		if(freeFramesCounter<1){	//No more free buffer available
-    			return p;
-    		}
-    	}
-    	return p;
+
+    private ArrayList<Integer> selectPartitionsToReload() {
+        ArrayList<Integer> p = new ArrayList<Integer>();
+        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) {
+            if (buildPSizeInFrames[i]>0 && (freeFramesCounter - buildPSizeInFrames[i] >= 0) ) {
+                p.add(i);
+                freeFramesCounter -= buildPSizeInFrames[i];
+            }
+            if (freeFramesCounter < 1) { //No more free buffer available
+                return p;
+            }
+        }
+        return p;
     }
-    
-    private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException{
-    	ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
-    	this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, 
-    							new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, 
-    								new FrameTupleAccessor(ctx.getFrameSize(), buildRd), buildHpc,
-    									new FrameTuplePairComparator(probeKeys, buildKeys, comparators), 
-    										isLeftOuter, nullWriters1, table);
+
+    private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
+        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
+        this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
+                new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
+                        ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
+                        comparators), isLeftOuter, nullWriters1, table);
     }
-    
-    private void cacheInMemJoin() throws HyracksDataException{
-    	
-    	for(int pid=0; pid<numOfPartitions; pid++){	
-        	if(!pStatus.get(pid) ){
-        		int nextBuffIx = curPBuff[pid];
-                while(nextBuffIx > -1 ){		//It is not Invalid or End_Of_Partition
-                	inMemJoiner.build(memBuffs[nextBuffIx]);
+
+    private void cacheInMemJoin() throws HyracksDataException {
+
+        for (int pid = 0; pid < numOfPartitions; pid++) {
+            if (!pStatus.get(pid)) {
+                int nextBuffIx = curPBuff[pid];
+                while (nextBuffIx > -1) { //It is not Invalid or End_Of_Partition
+                    inMemJoiner.build(memBuffs[nextBuffIx]);
                     nextBuffIx = nextBuff[nextBuffIx];
                 }
             }
         }
     }
-    
-    public void initProbe(){
-    	
-    		sPartBuffs = new ByteBuffer[numOfSpilledParts];
-            for(int i=0; i<numOfSpilledParts; i++){
-            	sPartBuffs[i] = ctx.allocateFrame();
-            }
-            curPBuff = new int[numOfPartitions];
-            int nextBuffIxToAlloc = 0;
-            for(int i=0; i<numOfPartitions; i++){
-                curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
-            }
-            probePSizeInTups = new int[numOfPartitions];
-            probeRFWriters = new RunFileWriter[numOfPartitions];
-            
-            probeResBuff = ctx.allocateFrame();
-            
-            probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
-            probeTupAppenderToResident.reset(probeResBuff, true);
-           
-            probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
-            
-    }
-    
-    public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException{
-    
-    	accessorProbe.reset(buffer);
-        int tupleCount = accessorProbe.getTupleCount();
-        
-        
-        if(numOfSpilledParts == 0){
-        	inMemJoiner.join(buffer, writer);
-        	return;
+
+    public void initProbe() {
+
+        sPartBuffs = new ByteBuffer[numOfSpilledParts];
+        for (int i = 0; i < numOfSpilledParts; i++) {
+            sPartBuffs[i] = ctx.allocateFrame();
         }
-        
+        curPBuff = new int[numOfPartitions];
+        int nextBuffIxToAlloc = 0;
+        /* We only need to allocate one frame per spilled partition. 
+         * Resident partitions do not need frames in probe, as their tuples join 
+         * immediately with the resident build tuples using the inMemoryHashJoin */
+        for (int i = 0; i < numOfPartitions; i++) { 
+            curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
+        }
+        probePSizeInTups = new int[numOfPartitions];
+        probeRFWriters = new RunFileWriter[numOfPartitions];
+
+        probeResBuff = ctx.allocateFrame();
+
+        probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
+        probeTupAppenderToResident.reset(probeResBuff, true);
+
+        probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
+
+    }
+
+    public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+
+        accessorProbe.reset(buffer);
+        int tupleCount = accessorProbe.getTupleCount();
+
+        if (numOfSpilledParts == 0) {
+            inMemJoiner.join(buffer, writer);
+            return;
+        }
+
         for (int i = 0; i < tupleCount; ++i) {
-        	int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
- 
-        	
-        	if(buildPSizeInTups[pid]>0){	//Tuple has potential match from previous phase
-        		if( pStatus.get(pid) ){   	//pid is Spilled
+            int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
+
+            if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
+                if (pStatus.get(pid)) { //pid is Spilled
                     boolean needToClear = false;
-                    ByteBuffer buff = sPartBuffs[ curPBuff[pid] ];
+                    ByteBuffer buff = sPartBuffs[curPBuff[pid]];
                     while (true) {
-                    	probeTupAppenderToSpilled.reset(buff, needToClear);
+                        probeTupAppenderToSpilled.reset(buff, needToClear);
                         if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
                             break;
-                        } else {
-                        	probeWrite(pid, buff);
-                            buff.clear();
-                            needToClear = true;
-                        }
+                        } 
+                        probeWrite(pid, buff);
+                        buff.clear();
+                        needToClear = true;
                     }
-        		}
-        		else  {   					//pid is Resident
+                } else { //pid is Resident
                     while (true) {
-                    	
-                        if (!probeTupAppenderToResident.append(accessorProbe, i)) {
-                            inMemJoiner.join(probeResBuff, writer);
-                            probeTupAppenderToResident.reset(probeResBuff, true);
-                        } else{
-                        	break;
+                        if (probeTupAppenderToResident.append(accessorProbe, i)){
+                            break;
                         }
-                    } 
-                   
+                        inMemJoiner.join(probeResBuff, writer);
+                        probeTupAppenderToResident.reset(probeResBuff, true);
+                    }
+
                 }
-        		probePSizeInTups[pid]++; 
-        	}
-        	
+                probePSizeInTups[pid]++;
+            }
+
         }
-        
-       
+
     }
-    
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException{	//We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
-    	inMemJoiner.join(probeResBuff, writer);
-    	inMemJoiner.closeJoin(writer);
-    	
-    	for(int pid=pStatus.nextSetBit(0); pid>=0; pid=pStatus.nextSetBit(pid+1)){
-            ByteBuffer buff = sPartBuffs[ curPBuff[pid] ];
+
+    public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+        inMemJoiner.join(probeResBuff, writer);
+        inMemJoiner.closeJoin(writer);
+
+        for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
+            ByteBuffer buff = sPartBuffs[curPBuff[pid]];
             accessorProbe.reset(buff);
             if (accessorProbe.getTupleCount() > 0) {
-            	probeWrite(pid, buff);
+                probeWrite(pid, buff);
             }
             closeProbeWriter(pid);
-    	}
+        }
     }
-    
-    
+
     private void probeWrite(int pid, ByteBuffer buff) throws HyracksDataException {
         RunFileWriter pWriter = probeRFWriters[pid];
         if (pWriter == null) {
-        	FileReference file = ctx.createManagedWorkspaceFile(rel1Name);
-        	pWriter = new RunFileWriter(file, ctx.getIOManager());
+            FileReference file = ctx.createManagedWorkspaceFile(rel1Name);
+            pWriter = new RunFileWriter(file, ctx.getIOManager());
             pWriter.open();
             probeRFWriters[pid] = pWriter;
         }
         pWriter.nextFrame(buff);
     }
-    
+
     private void closeProbeWriter(int pid) throws HyracksDataException {
         RunFileWriter writer = probeRFWriters[pid];
         if (writer != null) {
             writer.close();
         }
     }
-    
-    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException{
-    	return ((buildRFWriters[pid]==null) ? null: (buildRFWriters[pid]).createReader());
+
+    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
+        return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader());
     }
-    
-    public long getBuildPartitionSize(int pid){
-    	return ((buildRFWriters[pid]==null) ? 0: buildRFWriters[pid].getFileSize() );
+
+    public long getBuildPartitionSize(int pid) {
+        return ((buildRFWriters[pid] == null) ? 0 : buildRFWriters[pid].getFileSize());
     }
-    
-    public int getBuildPartitionSizeInTup(int pid){
-    	return (buildPSizeInTups[pid]);
+
+    public int getBuildPartitionSizeInTup(int pid) {
+        return (buildPSizeInTups[pid]);
     }
-    
-    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException{
-    	return ((probeRFWriters[pid]==null) ? null: (probeRFWriters[pid]).createReader());
+
+    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
+        return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader());
     }
-    
-    public long getProbePartitionSize(int pid){
-    	return ((probeRFWriters[pid]==null) ? 0: probeRFWriters[pid].getFileSize() );
+
+    public long getProbePartitionSize(int pid) {
+        return ((probeRFWriters[pid] == null) ? 0 : probeRFWriters[pid].getFileSize());
     }
-    
-    public int getProbePartitionSizeInTup(int pid){
-    	return (probePSizeInTups[pid]);
+
+    public int getProbePartitionSizeInTup(int pid) {
+        return (probePSizeInTups[pid]);
     }
-    
-    public int getMaxBuildPartitionSize(){
-    	int max = buildPSizeInTups[0];
-    	for(int i=1; i<buildPSizeInTups.length; i++){
-    		if( buildPSizeInTups[i] > max ){
-    			max = buildPSizeInTups[i];
-    		}
-    	}
-    	return max;
+
+    public int getMaxBuildPartitionSize() {
+        int max = buildPSizeInTups[0];
+        for (int i = 1; i < buildPSizeInTups.length; i++) {
+            if (buildPSizeInTups[i] > max) {
+                max = buildPSizeInTups[i];
+            }
+        }
+        return max;
     }
-    
-    public int getMaxProbePartitionSize(){
-    	int max = probePSizeInTups[0];
-    	for(int i=1; i<probePSizeInTups.length; i++){
-    		if( probePSizeInTups[i] > max ){
-    			max = probePSizeInTups[i];
-    		}
-    	}
-    	return max;
+
+    public int getMaxProbePartitionSize() {
+        int max = probePSizeInTups[0];
+        for (int i = 1; i < probePSizeInTups.length; i++) {
+            if (probePSizeInTups[i] > max) {
+                max = probePSizeInTups[i];
+            }
+        }
+        return max;
     }
-    
-    public BitSet getPartitinStatus(){
-    	return pStatus;
+
+    public BitSet getPartitinStatus() {
+        return pStatus;
     }
-    
-    public String _stats_debug_getStats(){
-    	int numOfResidentPartitions = 0;
-    	int numOfSpilledPartitions = 0;
-    	double sumOfBuildSpilledSizes = 0;
-    	double sumOfProbeSpilledSizes = 0;
-    	int numOfInMemTups = 0;
-    	for(int i=0; i<numOfPartitions; i++){
-    		if(pStatus.get(i)){	//Spilled
-    			numOfSpilledPartitions++;
-    			sumOfBuildSpilledSizes += buildPSizeInTups[i];
-    			sumOfProbeSpilledSizes += probePSizeInTups[i];
-    		}
-    		else{				//Resident
-    			numOfResidentPartitions++;
-    			numOfInMemTups += buildPSizeInTups[i];
-    		}
-    	}
-    	
-    	double avgBuildSpSz = sumOfBuildSpilledSizes/numOfSpilledPartitions;
-    	double avgProbeSpSz = sumOfProbeSpilledSizes/numOfSpilledPartitions;
-    	String s = "Resident Partitions:\t"+numOfResidentPartitions+"\nSpilled Partitions:\t"+numOfSpilledPartitions+
-    					"\nAvg Build Spilled Size:\t"+avgBuildSpSz+"\nAvg Probe Spilled Size:\t"+avgProbeSpSz+
-    						"\nIn-Memory Tups:\t"+numOfInMemTups+"\nNum of Free Buffers:\t"+freeFramesCounter;
-    	return s;
+
+    public String _stats_debug_getStats() {
+        int numOfResidentPartitions = 0;
+        int numOfSpilledPartitions = 0;
+        double sumOfBuildSpilledSizes = 0;
+        double sumOfProbeSpilledSizes = 0;
+        int numOfInMemTups = 0;
+        for (int i = 0; i < numOfPartitions; i++) {
+            if (pStatus.get(i)) { //Spilled
+                numOfSpilledPartitions++;
+                sumOfBuildSpilledSizes += buildPSizeInTups[i];
+                sumOfProbeSpilledSizes += probePSizeInTups[i];
+            } else { //Resident
+                numOfResidentPartitions++;
+                numOfInMemTups += buildPSizeInTups[i];
+            }
+        }
+
+        double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
+        double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
+        String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
+                + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
+                + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
+                + freeFramesCounter;
+        return s;
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index bcb46f5..e6a7035 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -40,42 +40,83 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
+/**
+ * @author pouria
+ *  This class guides the joining process, and switches between different
+    joining techniques, w.r.t the implemented optimizations and skew in size of the
+    partitions.
+    
+    - Operator overview:
+    Assume we are trying to do (R Join S), with M buffers available, while we have an estimate on the size
+    of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase
+    can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow:
+    
+    * BUILD:
+    – Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
+    – Initialize the build phase (one frame per partition – all partitions considered resident at first)
+    – Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
+        its target partition and try to append it to that partition:
+    – If target partition's buffer is full, try to allocate a new buffer for it.
+    – if no free buffer is available, find the largest resident partition and spill it. Using its freed
+        buffers after spilling, allocate a new buffer for the target partition.
+    – Being done with R, close the build phase. (During closing we write the very last buffer of each
+    spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to
+    spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough
+    for reloading an entire partition back into memory)
+    – Create the hash table for the resident partitions (basically we create an in-memory hash join here)
 
+    * PROBE:
+    – Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
+        for the whole resident partitions)
+    – Read tuples of S, frame by frame and hash each tuple T to its target partition P
+    – if P is a resident partition, pass T to the in-memory hash join and generate the output record,
+        if any matching(s) record found
+    – if P is spilled, write T to the dedicated buffer for P (on the probe side)
+    – Once scanning of S is done, we try to join partition pairs (Ri, Si) of the spilled partitions:
+    – if any of Ri or Si is smaller than M, then we simply use an in-memory hash join to join them
+    – otherwise we apply HHJ recursively:
+    – if after applying HHJ recursively, we do not gain enough size reduction (max size of the
+        resulting partitions were more than 80% of the initial Ri,Si size) then we switch to
+        nested loop join for joining.
+    – (At each step of partition-pair joining, we consider role reversal, which means if size of Si were
+        greater than Ri, then we make sure that we switch the roles of build/probe between them)
+ */
 
 public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-	
-	private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
+
+    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
     private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-	
-	private static final long serialVersionUID = 1L;
-	private static final double NLJ_SWITCH_THRESHOLD = 0.8;
-	
-    
+
+    private static final long serialVersionUID = 1L;
+    private static final double NLJ_SWITCH_THRESHOLD = 0.8;
+
     private static final String PROBE_REL = "RelR";
     private static final String BUILD_REL = "RelS";
-    
+
     private final int memsize;
-	private final int inputsize0;
-	private final double factor;
+    private final int inputsize0;
+    private final double fudgeFactor;
     private final int[] probeKeys;
     private final int[] buildKeys;
     private final IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories;			//For in-mem HJ
-    private final ITuplePairComparatorFactory tuplePairComparatorFactory0;	//For NLJ in probe
-    private final ITuplePairComparatorFactory tuplePairComparatorFactory1;	//For NLJ in probe
-    
+    private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
+    private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
+    private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
+
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
-    
-    public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories, 
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1, 
-            boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException  {
-       
+
+    public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, double factor,
+            int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+            ITuplePairComparatorFactory tupPaircomparatorFactory0,
+            ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
-        this.factor = factor;
+        this.fudgeFactor = factor;
         this.probeKeys = keys0;
         this.buildKeys = keys1;
         this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
@@ -85,18 +126,19 @@
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
-        
+
     }
-    
-    public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize,int inputsize0,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
-            throws HyracksDataException  {
-       
+
+    public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, double factor,
+            int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+            ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
+            throws HyracksDataException {
+
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
-        this.factor = factor;
+        this.fudgeFactor = factor;
         this.probeKeys = keys0;
         this.buildKeys = keys1;
         this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
@@ -107,12 +149,12 @@
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
     }
-    
+
     @Override
-	public void contributeActivities(IActivityGraphBuilder builder) {
-    	PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
                 BUILD_AND_PARTITION_ACTIVITY_ID));
-    	ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
+        ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
                 PARTITION_AND_JOIN_ACTIVITY_ID));
 
         builder.addActivity(phase1);
@@ -124,43 +166,36 @@
         builder.addBlockingEdge(phase1, phase2);
 
         builder.addTargetEdge(0, phase2, 0);
-		
-	}
-    
-    
-    	//memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
-    private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions) throws HyracksDataException{
-    	int B = 0;
-    	if (memorySize > 1) {
-            if (memorySize > buildSize) {
-               return 1;		//We will switch to in-Mem HJ eventually
-            }
-            else {
-                B = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize)
-                        / (double) (memorySize - 1)));
-                if (B <= 0) {
-                    B = 1;			//becomes in-memory hash join
-                }
-            }
-            if(B > memorySize){
-            	B = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
-            	return ( B<memorySize ? B : memorySize );
-            }
-        }
-    	
-    	else {
-            throw new HyracksDataException("not enough memory for Hybrid Hash Join");
-        }
 
-    	return B;
     }
-    
+
+    //memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
+    private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
+            throws HyracksDataException {
+        int numberOfPartitions = 0;
+        if (memorySize <= 1) {
+            throw new HyracksDataException("not enough memory is available for Hybrid Hash Join");
+        }
+        if (memorySize > buildSize) {
+            return 1; //We will switch to in-Mem HJ eventually
+        } 
+        numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize) / (double) (memorySize - 1)));
+        if (numberOfPartitions <= 0) {
+            numberOfPartitions = 1; //becomes in-memory hash join
+        }
+        if (numberOfPartitions > memorySize) {
+            numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
+            return (numberOfPartitions < memorySize ? numberOfPartitions : memorySize);
+        }
+        return numberOfPartitions;
+    }
+
     public static class BuildAndPartitionTaskState extends AbstractTaskState {
-        
-    	private int memForJoin;
-    	private int numOfPartitions;
-    	private OptimizedHybridHashJoin hybridHJ;
-        
+
+        private int memForJoin;
+        private int numOfPartitions;
+        private OptimizedHybridHashJoin hybridHJ;
+
         public BuildAndPartitionTaskState() {
         }
 
@@ -180,388 +215,400 @@
 
     }
     
-    
+    /*
+     * Build phase of Hybrid Hash Join:
+     * Creating an instance of Hybrid Hash Join, using Shapiro's formula
+     * to get the optimal number of partitions, build relation is read and
+     * partitioned, and hybrid hash join instance gets ready for the probing.
+     * (See OptimizedHybridHashJoin for the details on different steps)
+     */
     private class PartitionAndBuildActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        
-        
+
         public PartitionAndBuildActivityNode(ActivityId id) {
             super(id);
         }
-        
+
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            
-        	
-        	
+
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
-            
+
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
+            for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            
+
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
                 for (int i = 0; i < nullWriterFactories1.length; i++) {
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-            	 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
-                         .getJobId(), new TaskId(getActivityId(), partition));
-            	
-            	ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys, hashFunctionGeneratorFactories).createPartitioner(0);
-            	ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys, hashFunctionGeneratorFactories).createPartitioner(0);
-            	
-            	
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
+                        .getJobId(), new TaskId(getActivityId(), partition));
+
+                ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys,
+                        hashFunctionGeneratorFactories).createPartitioner(0);
+                ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys,
+                        hashFunctionGeneratorFactories).createPartitioner(0);
+
                 @Override
                 public void open() throws HyracksDataException {
-                	
-                	if(memsize > 2){
-                		state.memForJoin = memsize - 2;
-                		state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, factor, nPartitions);
-                		state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                				PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, 
-                				probeRd, buildRd, probeHpc, buildHpc);
-                		state.hybridHJ.initBuild();
-                	}
-                	else{
-                		throw new HyracksDataException("not enough memory for Hybrid Hash Join");
-                	}
-                	
+                    if (memsize <= 2){  //Dedicated buffers: One buffer to read and one buffer for output
+                        throw new HyracksDataException("not enough memory for Hybrid Hash Join");
+                    }
+                    state.memForJoin = memsize - 2;
+                    state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
+                    state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                            PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                            buildHpc);
+                    state.hybridHJ.initBuild();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     state.hybridHJ.build(buffer);
                 }
-                
+
                 @Override
                 public void close() throws HyracksDataException {
-                	state.hybridHJ.closeBuild();
-                	env.setTaskState(state);
+                    state.hybridHJ.closeBuild();
+                    env.setTaskState(state);
                 }
 
-				@Override
-				public void fail() throws HyracksDataException {
-				}
-                
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+
             };
             return op;
         }
     }
-    
+    /*
+     * Probe phase of Hybrid Hash Join:
+     * Reading the probe side and partitioning it, resident tuples get
+     * joined with the build side residents (through formerly created HybridHashJoin in the build phase)
+     * and spilled partitions get written to run files. During the close() call, pairs of spilled partition
+     * (build side spilled partition and its corresponding probe side spilled partition) join, by applying
+     * Hybrid Hash Join recursively on them.
+     */
     private class ProbeAndJoinActivityNode extends AbstractActivityNode {
 
-    	private static final long serialVersionUID = 1L;
-    	private int maxLevel = -1;
-    	
-    	public ProbeAndJoinActivityNode(ActivityId id){
-    		super(id);
-    	}
+        private static final long serialVersionUID = 1L;
+        
 
-    	@Override
-    	public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-    			IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+        public ProbeAndJoinActivityNode(ActivityId id) {
+            super(id);
+        }
 
-    		final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-    		final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
-    		final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-    		final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator();
-    		final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator();
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
 
-    		for (int i = 0; i < comparatorFactories.length; ++i) {
-    			comparators[i] = comparatorFactories[i].createBinaryComparator();
-    		}
+            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator();
+            final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator();
 
-    		final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
-    		if (isLeftOuter) {
-    			for (int i = 0; i < nullWriterFactories1.length; i++) {
-    				nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
-    			}
-    		}
+            for (int i = 0; i < comparatorFactories.length; i++) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
 
-    		IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-    			private BuildAndPartitionTaskState state;
-    			private ByteBuffer rPartbuff = ctx.allocateFrame();
-    			
-    			private ITuplePartitionComputerGeneratorFactory hpcf0 = new FieldHashPartitionComputerGeneratorFactory(probeKeys, hashFunctionGeneratorFactories);
-    			private ITuplePartitionComputerGeneratorFactory hpcf1 = new FieldHashPartitionComputerGeneratorFactory(buildKeys, hashFunctionGeneratorFactories);
-    			
-    			private ITuplePartitionComputer hpcRep0;
-    			private ITuplePartitionComputer hpcRep1;
-    			
-    			@Override
-    			public void open() throws HyracksDataException {
-    				state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
+
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private BuildAndPartitionTaskState state;
+                private ByteBuffer rPartbuff = ctx.allocateFrame();
+
+                private ITuplePartitionComputerGeneratorFactory hpcf0 = new FieldHashPartitionComputerGeneratorFactory(
+                        probeKeys, hashFunctionGeneratorFactories);
+                private ITuplePartitionComputerGeneratorFactory hpcf1 = new FieldHashPartitionComputerGeneratorFactory(
+                        buildKeys, hashFunctionGeneratorFactories);
+
+                private ITuplePartitionComputer hpcRep0;
+                private ITuplePartitionComputer hpcRep1;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
                             BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-    				
-    				writer.open();
-    				state.hybridHJ.initProbe();
-    				
-    			}
 
-    			@Override
-    			public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-    				state.hybridHJ.probe(buffer, writer);
-    			}
+                    writer.open();
+                    state.hybridHJ.initProbe();
 
-    			@Override
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    state.hybridHJ.probe(buffer, writer);
+                }
+
+                @Override
                 public void fail() throws HyracksDataException {
                     writer.fail();
                 }
 
-    			@Override
-    			public void close() throws HyracksDataException {
+                @Override
+                public void close() throws HyracksDataException {
 
-    				state.hybridHJ.closeProbe(writer);
-    				
-    				BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
-    				hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0).createPartitioner(0);
-    				hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1).createPartitioner(0);
-    				
-    				rPartbuff.clear();     
-    				for(int pid=partitionStatus.nextSetBit(0); pid>=0; pid=partitionStatus.nextSetBit(pid+1)){
+                    state.hybridHJ.closeProbe(writer);
 
-    					RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
-						RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
-    						
-    					if (bReader == null || pReader == null) {
-    						continue;
-    					}
-    					int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
-    					int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
-    					int beforeMax = (bSize>pSize) ? bSize : pSize;
-    					joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
-    						
-    				}
-    				writer.close();
-    			}
-    			
-    			private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader, RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException{
-    				ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys, hashFunctionGeneratorFactories).createPartitioner(level);
-                	ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys, hashFunctionGeneratorFactories).createPartitioner(level);
-        			
-    				if(level > maxLevel){
-    					maxLevel = level;
-    				}
-    				
-    				long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
-					long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
-    				
-						//Apply in-Mem HJ if possible
-					if( (buildPartSize<state.memForJoin) || (probePartSize<state.memForJoin)  ){
-						int tabSize = -1;
-						if(buildPartSize<probePartSize){
-							tabSize = ohhj.getBuildPartitionSizeInTup(pid);
-							if(tabSize>0){	//Build Side is smaller
-								applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0, buildSideReader, probeSideReader);
-							}
-						}
-						else{   //Role Reversal
-							tabSize = ohhj.getProbePartitionSizeInTup(pid);
-							if(tabSize>0){	//Probe Side is smaller
-								applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1, probeSideReader, buildSideReader);
-							}
-						}
-					}
-						//Apply (Recursive) HHJ
-					else {
-						OptimizedHybridHashJoin rHHj;
-						if(buildPartSize<probePartSize){	//Build Side is smaller
-							
-							int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, factor, nPartitions);
-							rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin,
-									n, PROBE_REL, BUILD_REL,
-										probeKeys, buildKeys, comparators,
-											probeRd, buildRd, probeHpc, buildHpc);
-							
-							
-							buildSideReader.open();
-							rHHj.initBuild();
-		    				rPartbuff.clear();
-		    				while (buildSideReader.nextFrame(rPartbuff)) {
-		    					rHHj.build(rPartbuff);
-		    				}
-		    				
-		    				rHHj.closeBuild();
-		    				
-		    				probeSideReader.open();
-		    				rHHj.initProbe();
-		    				rPartbuff.clear();
-		    				while (probeSideReader.nextFrame(rPartbuff)) {
-		    					rHHj.probe(rPartbuff, writer);
-		    				}
-		    				rHHj.closeProbe(writer);
-		    				
-		    				int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-		    				int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
-		    				int afterMax = (maxAfterBuildSize>maxAfterProbeSize) ? maxAfterBuildSize : maxAfterProbeSize;
-		    				
-		    				BitSet rPStatus = rHHj.getPartitinStatus();
-		    				if(afterMax < NLJ_SWITCH_THRESHOLD*beforeMax){
-		    					for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
-		    						RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-		    						RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-		    						
-		    						if (rbrfw == null || rprfw == null) {
-		        						continue;
-		        					}
-		    						
-		    						joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level+1));
-		    					}
-		    					
-		    				}
-		    				else{	//Switch to NLJ (Further recursion seems not to be useful)
-		    					for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
-		    						RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-		    						RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-		    						
-		    						if (rbrfw == null || rprfw == null) {
-		        						continue;
-		        					}
-		    						
-		    						int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
-		    						int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-		    						if(buildSideInTups<probeSideInTups){
-		    							applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw, nljComparator0);
-		    						}
-		    						else{
-		    							applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw, nljComparator1);
-		    						}
-		    					}
-		    				}
-						}
-						else{ //Role Reversal (Probe Side is smaller)
-							int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, factor, nPartitions);
-							rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin,
-									n, BUILD_REL, PROBE_REL,
-										buildKeys, probeKeys, comparators,
-											buildRd, probeRd,buildHpc, probeHpc);
-							
-							probeSideReader.open();
-							rHHj.initBuild();
-		    				rPartbuff.clear();
-		    				while (probeSideReader.nextFrame(rPartbuff)) {
-		    					rHHj.build(rPartbuff);
-		    				}
-		    				rHHj.closeBuild();
-		    				rHHj.initProbe();
-		    				buildSideReader.open();
-		    				rPartbuff.clear();
-		    				while (buildSideReader.nextFrame(rPartbuff)) {
-		    					rHHj.probe(rPartbuff, writer);
-		    				}
-		    				rHHj.closeProbe(writer);
-		    				int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-		    				int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
-		    				int afterMax = (maxAfterBuildSize>maxAfterProbeSize) ? maxAfterBuildSize : maxAfterProbeSize;
-		    				BitSet rPStatus = rHHj.getPartitinStatus();
-		    				
-		    				if(afterMax < NLJ_SWITCH_THRESHOLD*beforeMax){
-		    					for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
-		    						RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-		    						RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-		    						
-		    						if (rbrfw == null || rprfw == null) {
-		        						continue;
-		        					}
-		    						
-		    						joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level+1));
-		    					}
-		    				}
-		    				else{	//Switch to NLJ (Further recursion seems not to be effective)
-		    					for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
-		    						RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-		    						RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-		    						
-		    						if (rbrfw == null || rprfw == null) {
-		        						continue;
-		        					}
-		    						
-		    						long buildSideSize = rbrfw.getFileSize();
-		    						long probeSideSize = rprfw.getFileSize();
-		    						if(buildSideSize>probeSideSize){
-		    							applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw, nljComparator1);
-		    						}
-		    						else{
-		    							applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw, nljComparator0);
-		    						}
-		    					}
-		    				}
-						}
-						buildSideReader.close();
-						probeSideReader.close();
-					}
-    			}
-    			
-    			private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, 
-    					RecordDescriptor smallerRd, RecordDescriptor largerRd, 
-    					ITuplePartitionComputer hpcRepLarger, ITuplePartitionComputer hpcRepSmaller,
-    					RunFileReader bReader, RunFileReader pReader) throws HyracksDataException{
-    				
-    				ISerializableTable table = new SerializableHashTable(tabSize, ctx);
-    				InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, 
-    						new FrameTupleAccessor(ctx.getFrameSize(), largerRd), hpcRepLarger, 
-    						new FrameTupleAccessor(ctx.getFrameSize(), smallerRd), hpcRepSmaller, 
-    						new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table);
+                    BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
+                    hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
+                    .createPartitioner(0);
+                    hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
+                    .createPartitioner(0);
 
-    				bReader.open();
-    				rPartbuff.clear();
-    				while (bReader.nextFrame(rPartbuff)) {
-    					ByteBuffer copyBuffer = ctx.allocateFrame();
-    					FrameUtils.copy(rPartbuff, copyBuffer);
-    					FrameUtils.makeReadable(copyBuffer);
-    					joiner.build(copyBuffer);
-    					rPartbuff.clear();
-    				}
-    				bReader.close();
-    				rPartbuff.clear();
-    				// probe
-    				pReader.open();
-    				while (pReader.nextFrame(rPartbuff)) {
-    					joiner.join(rPartbuff, writer);
-    					rPartbuff.clear();
-    				}
-    				pReader.close();
-    				joiner.closeJoin(writer);
-    			}
+                    rPartbuff.clear();
+                    for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
 
-    			private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljCompaarator) throws HyracksDataException{
-    				
-    				NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
-    						new FrameTupleAccessor(ctx.getFrameSize(), innerRd), 
-    						nljCompaarator, memorySize);
+                        RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
+                        RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
 
+                        if (bReader == null || pReader == null) {   //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+                            continue;
+                        }
+                        int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
+                        int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+                        int beforeMax = (bSize > pSize) ? bSize : pSize;
+                        joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
 
-    				ByteBuffer cacheBuff = ctx.allocateFrame();
-    				innerReader.open();
-    				while(innerReader.nextFrame(cacheBuff)){
-    					FrameUtils.makeReadable(cacheBuff);
-    					nlj.cache(cacheBuff);
-    					cacheBuff.clear();
-    				}
-    				nlj.closeCache();
-    	
-    				ByteBuffer joinBuff = ctx.allocateFrame();
-    				outerReader.open();
-    				
-    				while(outerReader.nextFrame(joinBuff)){
-    					FrameUtils.makeReadable(joinBuff);
-    					nlj.join(joinBuff, writer);
-    					joinBuff.clear();
-    				}
-    				
-    				nlj.closeJoin(writer);
-    				outerReader.close();
-    				innerReader.close();
-    			}
-    		};
-    		return op;
-    	}
+                    }
+                    writer.close();
+                }
+
+                private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
+                        RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
+                    ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys,
+                            hashFunctionGeneratorFactories).createPartitioner(level);
+                    ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys,
+                            hashFunctionGeneratorFactories).createPartitioner(level);
+
+                    long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
+                    long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+
+                        //Apply in-Mem HJ if possible
+                    if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
+                        int tabSize = -1;
+                        if (buildPartSize < probePartSize) {
+                            tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+                            if(tabSize == 0){
+                                throw new HyracksDataException("Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+                            }
+                            //Build Side is smaller
+                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
+                                    buildSideReader, probeSideReader);
+                            
+                        } else { //Role Reversal
+                            tabSize = ohhj.getProbePartitionSizeInTup(pid);
+                            if(tabSize == 0){
+                                throw new HyracksDataException("Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+                            }
+                            //Probe Side is smaller
+                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
+                                    probeSideReader, buildSideReader);
+                        }
+                    }
+                        //Apply (Recursive) HHJ
+                    else {
+                        OptimizedHybridHashJoin rHHj;
+                        if (buildPartSize < probePartSize) { //Build Side is smaller
+
+                            int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor, nPartitions);
+                            rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
+                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
+
+                            buildSideReader.open();
+                            rHHj.initBuild();
+                            rPartbuff.clear();
+                            while (buildSideReader.nextFrame(rPartbuff)) {
+                                rHHj.build(rPartbuff);
+                            }
+
+                            rHHj.closeBuild();
+
+                            probeSideReader.open();
+                            rHHj.initProbe();
+                            rPartbuff.clear();
+                            while (probeSideReader.nextFrame(rPartbuff)) {
+                                rHHj.probe(rPartbuff, writer);
+                            }
+                            rHHj.closeProbe(writer);
+
+                            int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+                            int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+                            int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
+                                    : maxAfterProbeSize;
+
+                            BitSet rPStatus = rHHj.getPartitinStatus();
+                            if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+                                    if (rbrfw == null || rprfw == null) {
+                                        continue;
+                                    }
+
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
+                                }
+
+                            } else { //Switch to NLJ (Further recursion seems not to be useful)
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+                                    if (rbrfw == null || rprfw == null) {
+                                        continue;
+                                    }
+
+                                    int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
+                                    int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+                                    if (buildSideInTups < probeSideInTups) {
+                                        applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
+                                                nljComparator0);
+                                    } else {
+                                        applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
+                                                nljComparator1);
+                                    }
+                                }
+                            }
+                        } else { //Role Reversal (Probe Side is smaller)
+                            int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor, nPartitions);
+                            rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
+                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
+
+                            probeSideReader.open();
+                            rHHj.initBuild();
+                            rPartbuff.clear();
+                            while (probeSideReader.nextFrame(rPartbuff)) {
+                                rHHj.build(rPartbuff);
+                            }
+                            rHHj.closeBuild();
+                            rHHj.initProbe();
+                            buildSideReader.open();
+                            rPartbuff.clear();
+                            while (buildSideReader.nextFrame(rPartbuff)) {
+                                rHHj.probe(rPartbuff, writer);
+                            }
+                            rHHj.closeProbe(writer);
+                            int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+                            int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+                            int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
+                                    : maxAfterProbeSize;
+                            BitSet rPStatus = rHHj.getPartitinStatus();
+
+                            if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+                                    if (rbrfw == null || rprfw == null) {
+                                        continue;
+                                    }
+
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
+                                }
+                            } else { //Switch to NLJ (Further recursion seems not to be effective)
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                    RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                    RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+                                    if (rbrfw == null || rprfw == null) {
+                                        continue;
+                                    }
+
+                                    long buildSideSize = rbrfw.getFileSize();
+                                    long probeSideSize = rprfw.getFileSize();
+                                    if (buildSideSize > probeSideSize) {
+                                        applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
+                                                nljComparator1);
+                                    } else {
+                                        applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
+                                                nljComparator0);
+                                    }
+                                }
+                            }
+                        }
+                        buildSideReader.close();
+                        probeSideReader.close();
+                    }
+                }
+
+                private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
+                        RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
+                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+                        throws HyracksDataException {
+
+                    ISerializableTable table = new SerializableHashTable(tabSize, ctx);
+                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
+                            ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
+                            buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
+                            isLeftOuter, nullWriters1, table);
+
+                    bReader.open();
+                    rPartbuff.clear();
+                    while (bReader.nextFrame(rPartbuff)) {
+                        ByteBuffer copyBuffer = ctx.allocateFrame();    //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+                        FrameUtils.copy(rPartbuff, copyBuffer);
+                        FrameUtils.makeReadable(copyBuffer);
+                        joiner.build(copyBuffer);
+                        rPartbuff.clear();
+                    }
+                    bReader.close();
+                    rPartbuff.clear();
+                    // probe
+                    pReader.open();
+                    while (pReader.nextFrame(rPartbuff)) {
+                        joiner.join(rPartbuff, writer);
+                        rPartbuff.clear();
+                    }
+                    pReader.close();
+                    joiner.closeJoin(writer);
+                }
+
+                private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
+                        RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator)
+                        throws HyracksDataException {
+
+                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize);
+
+                    ByteBuffer cacheBuff = ctx.allocateFrame();
+                    innerReader.open();
+                    while (innerReader.nextFrame(cacheBuff)) {
+                        FrameUtils.makeReadable(cacheBuff);
+                        nlj.cache(cacheBuff);
+                        cacheBuff.clear();
+                    }
+                    nlj.closeCache();
+
+                    ByteBuffer joinBuff = ctx.allocateFrame();
+                    outerReader.open();
+
+                    while (outerReader.nextFrame(joinBuff)) {
+                        FrameUtils.makeReadable(joinBuff);
+                        nlj.join(joinBuff, writer);
+                        joinBuff.clear();
+                    }
+
+                    nlj.closeJoin(writer);
+                    outerReader.close();
+                    innerReader.close();
+                }
+            };
+            return op;
+        }
     }
 }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index 96a24b3..3820119 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -3,7 +3,7 @@
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
-import java.io.PrintStream;
+
 
 import org.junit.Test;
 
@@ -68,7 +68,7 @@
     	JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/pouria/integration/customer4.tbl"))) };
+                "data/tpch0.001/customer4.tbl"))) };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -78,7 +78,7 @@
 
         
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/pouria/integration/orders4.tbl"))) };
+                "data/tpch0.001/orders4.tbl"))) };
         
         
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
@@ -153,7 +153,7 @@
     	JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/pouria/integration/customer3.tbl"))) };
+                "data/tpch0.001/customer3.tbl"))) };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -163,14 +163,9 @@
 
         
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/pouria/integration/orders4.tbl"))) };
+                "data/tpch0.001/orders4.tbl"))) };
         
         
-        /*
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-        							"data/tpch0.001/pouria/integration/orders4.tbl"))) };
-        */
-        
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -243,7 +238,7 @@
     	JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/tpch0.001/pouria/integration/customer3.tbl"))) };
+                "data/tpch0.001/customer3.tbl"))) };
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -253,13 +248,9 @@
 
         
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-                "data/tpch0.001/pouria/integration/orders1.tbl"))) };
+                "data/tpch0.001/orders1.tbl"))) };
         
         
-        /*
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-        							"data/tpch0.001/pouria/integration/orders4.tbl"))) };
-        */
         
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {