Changes to fix memory leak in Join operators
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 979ef59..994b0ef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -139,16 +139,7 @@
                 boolean prdEval = evaluatePredicate(i, j);
                 if (c == 0 && prdEval) {
                 	matchFound = true;
-                    if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
-                        flushFrame(outBuffer, writer);
-                        appender.reset(outBuffer, true);
-                        if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
-                            int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i)
-                                    + accessorInner.getTupleEndOffset(j) - accessorInner.getTupleStartOffset(j);
-                            throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                                    + appender.getBuffer().capacity() + ")");
-                        }
-                    }
+                	appendToResults(i, j, writer);
                 }
             }
 
@@ -177,7 +168,35 @@
     		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
     	}
     }
-
+    
+    private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException{
+    	if(!isReversed){
+    		if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+                    int tSize = accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId)
+                            + accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId);
+                    throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+                            + appender.getBuffer().capacity() + ")");
+                }
+            }
+    	}
+    	else{ //Role Reversal Optimization is triggered
+    		if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+                    int tSize = accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId)+
+                    		accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId);
+                    throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+                            + appender.getBuffer().capacity() + ")");
+                }
+            }
+    	}
+    	
+    }
+    
     public void closeCache() throws HyracksDataException {
         if (runFileWriter != null) {
             runFileWriter.close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 4e9376d..3afd08b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,7 +19,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -120,6 +119,12 @@
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
     
+    	//Flags added for test purpose
+    private static boolean skipInMemoryHJ = false;
+    private static boolean forceNLJ = false;
+    private static boolean forceRR = false;
+    
+    
     private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
 
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -305,7 +310,7 @@
                 public void close() throws HyracksDataException {
                     state.hybridHJ.closeBuild();
                     ctx.setStateObject(state);
-                    LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
+                    LOGGER.fine("OptimizedHybridHashJoin closed its build phase");
                 }
 
                 @Override
@@ -414,63 +419,60 @@
                         int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
                         int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                         int beforeMax = (bSize > pSize) ? bSize : pSize;
-                        joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
-
+                        joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
                     }
                     writer.close();
                 }
 
                 private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
-                        RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
+                        RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed) throws HyracksDataException {
                     ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     
-                    long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
-                    long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize());
+                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize());
                     
-                    LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
-
+                    LOGGER.warning("\n>>>Joining Partition Pairs (pid "+pid+") - (level "+level+") - wasReversed "+wasReversed+" - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin)+"  - LeftOuter is "+isLeftOuter);
+                                      
                     //Apply in-Mem HJ if possible
-                    if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
-                        int tabSize = -1;
-                        
-                        if (isLeftOuter || buildPartSize < probePartSize) {
-                            tabSize = ohhj.getBuildPartitionSizeInTup(pid);
-                           
-                            if (tabSize == 0) {
-                                throw new HyracksDataException(
-                                        "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+                    if(!skipInMemoryHJ){
+                    	if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin && !isLeftOuter)) {
+                            int tabSize = -1;
+                            if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize)) ) {	//Case 1.1 - InMemHJ (wout Role-Reversal)
+                            	LOGGER.warning("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "+level+"]");
+                                tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj.getBuildPartitionSizeInTup(pid);
+                                if (tabSize == 0) {
+                                    throw new HyracksDataException(
+                                            "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+                                }
+                                	//Build Side is smaller
+                                applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+                                        buildSideReader, probeSideReader, false, pid);	//checked-confirmed
+                            } else { //Case 1.2 - InMemHJ with Role Reversal
+                            	LOGGER.warning("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "+level+"]");
+                                tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj.getProbePartitionSizeInTup(pid);
+                                if (tabSize == 0) {
+                                    throw new HyracksDataException(
+                                            "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+                                }
+                                	//Probe Side is smaller
+                                applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+                                        probeSideReader, buildSideReader, true, pid);	//checked-confirmed
                             }
-                          //Build Side is smaller
-                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
-                                    buildSideReader, probeSideReader, false, pid);
-
-                        } 
-                        
-                        else { //Role Reversal
-                            tabSize = ohhj.getProbePartitionSizeInTup(pid);
-                            if (tabSize == 0) {
-                                throw new HyracksDataException(
-                                        "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
-                            }
-                            //Probe Side is smaller
-                            
-                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
-                                    probeSideReader, buildSideReader, true, pid);
                         }
                     }
                     //Apply (Recursive) HHJ
                     else {
+                    	LOGGER.warning("\t>>>Case 2. ApplyRecursiveHHJ - [Level "+level+"]");
                         OptimizedHybridHashJoin rHHj;
-                        if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
-                        	LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
+                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        	LOGGER.warning("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
                                     nPartitions);
-                           
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
-                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
+                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator); //checked-confirmed
 
                             buildSideReader.open();
                             rHHj.initBuild();
@@ -495,7 +497,8 @@
                                     : maxAfterProbeSize;
 
                             BitSet rPStatus = rHHj.getPartitinStatus();
-                            if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {	//Case 2.1.1 - Keep applying HHJ
+                            	LOGGER.warning("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -504,12 +507,12 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
                                 }
 
-                            } else { //Switch to NLJ (Further recursion seems not to be useful)
-                            	LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                            } else { //Case 2.1.2 - Switch to NLJ
+                            	LOGGER.warning("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+                            	for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
                                     
@@ -520,21 +523,21 @@
                                     int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                                     int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                                     if (isLeftOuter || buildSideInTups < probeSideInTups) {
-                                        applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
-                                                nljComparator0, false);
-                                    } else {
                                         applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
-                                                nljComparator1, false);
+                                                nljComparator0, false); //checked-modified
+                                    } else {
+                                        applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
+                                                nljComparator1, true); //checked-modified
                                     }
                                 }
                             }
-                        } else { //Role Reversal (Probe Side is smaller)
-                        	LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
+                        } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
+                        	LOGGER.warning("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
                                     nPartitions);
                             
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
-                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);	//checked-confirmed
                             rHHj.setIsReversed(true);	//Added to use predicateEvaluator (for inMemoryHashJoin) correctly
 
                             probeSideReader.open();
@@ -557,7 +560,8 @@
                                     : maxAfterProbeSize;
                             BitSet rPStatus = rHHj.getPartitinStatus();
 
-                            if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {	//Case 2.2.1 - Keep applying HHJ
+                            	LOGGER.warning("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -566,10 +570,10 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true);	//checked-confirmed
                                 }
-                            } else { //Switch to NLJ (Further recursion seems not to be effective)
-                            	LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+                            } else { //Case 2.2.2 - Switch to NLJ
+                            	LOGGER.warning("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
                             	for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -582,10 +586,10 @@
                                     long probeSideSize = rprfw.getFileSize();
                                     if (buildSideSize > probeSideSize) {
                                         applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
-                                                nljComparator1, true);
+                                                nljComparator0, true);	//checked-modified
                                     } else {
                                         applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
-                                                nljComparator0, true);
+                                                nljComparator1, true);	//checked-modified
                                     }
                                 }
                             }
@@ -599,7 +603,6 @@
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
                         ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
                         throws HyracksDataException {
-                	LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -617,7 +620,7 @@
                     }
                     bReader.close();
                     rPartbuff.clear();
-                    // probe
+                    	// probe
                     pReader.open();
                     while (pReader.nextFrame(rPartbuff)) {
                         joiner.join(rPartbuff, writer);
@@ -630,10 +633,16 @@
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                         RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
                         throws HyracksDataException {
-                	
-                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+                	/*
+                	NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
                             new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
-
+					*/
+                		//changed to take care of LOJ with NLJ
+                	NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter, nullWriters1);
+                	nlj.setIsReversed(reverse);
+                	
+                	
                     ByteBuffer cacheBuff = ctx.allocateFrame();
                     innerReader.open();
                     while (innerReader.nextFrame(cacheBuff)) {
@@ -660,4 +669,16 @@
             return op;
         }
     }
-}
+    
+    public void setSkipInMemHJ(boolean b){
+    	skipInMemoryHJ = b;
+    }
+    
+    public void setForceNLJ(boolean b){
+    	forceNLJ = b;
+    }
+    
+    public void setForceRR(boolean b){
+    	forceRR = (!isLeftOuter && b);
+    }
+}
\ No newline at end of file