Merge branch 'master' into pouria/fix-OptzHHJ
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index d02d65c..60e9c40 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -119,7 +119,7 @@
                     accessorBuild.reset(buffers.get(bIndex));
                     int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                     if (c == 0) {
-                    	boolean predEval = ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, i, accessorBuild, tIndex) );
+                    	boolean predEval = evaluatePredicate(i, tIndex);
                     	if(predEval){
                     		matchFound = true;
                             appendToResult(i, tIndex, writer);
@@ -155,6 +155,15 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
+    
+    private boolean evaluatePredicate(int tIx1, int tIx2){
+    	if(reverseOutputOrder){		//Role Reversal Optimization is triggered
+    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1) );
+    	}
+    	else {
+    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2) );
+    	}
+    }
 
     private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
         if (!reverseOutputOrder) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 2f719fa..979ef59 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -49,6 +49,8 @@
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
+    private boolean isReversed;		//Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+
     
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
             ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
@@ -63,6 +65,7 @@
         this.outBuffers = new ArrayList<ByteBuffer>();
         this.memSize = memSize;
         this.predEvaluator = predEval;
+        this.isReversed = false;
         this.ctx = ctx;
 
         this.isLeftOuter = isLeftOuter;
@@ -133,7 +136,7 @@
             boolean matchFound = false;
             for (int j = 0; j < tupleCount1; ++j) {
                 int c = compare(accessorOuter, i, accessorInner, j);
-                boolean prdEval = (predEvaluator == null) || (predEvaluator.evaluate(accessorOuter, i, accessorInner, j));
+                boolean prdEval = evaluatePredicate(i, j);
                 if (c == 0 && prdEval) {
                 	matchFound = true;
                     if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
@@ -165,6 +168,15 @@
             }
         }
     }
+    
+    private boolean evaluatePredicate(int tIx1, int tIx2){
+    	if(isReversed){		//Role Reversal Optimization is triggered
+    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1) );
+    	}
+    	else {
+    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
+    	}
+    }
 
     public void closeCache() throws HyracksDataException {
         if (runFileWriter != null) {
@@ -206,4 +218,8 @@
         }
         return 0;
     }
+    
+    public void setIsReversed(boolean b){
+    	this.isReversed = b;
+    }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index cd32c81..6bc810e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -99,6 +99,7 @@
     private int freeFramesCounter; //Used for partition tuning
     
     private boolean isTableEmpty;	//Added for handling the case, where build side is empty (tableSize is 0)
+    private boolean isReversed;		//Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
     
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -125,6 +126,7 @@
         this.predEvaluator = predEval;
         this.isLeftOuter = false;
         this.nullWriters1 = null;
+        this.isReversed = false;
 
     }
 
@@ -153,7 +155,8 @@
         
         this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
-
+        this.isReversed = false;
+        
         this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
             for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -441,7 +444,7 @@
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
                 new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
                         ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
-                        comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+                        comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
     }
 
     private void cacheInMemJoin() throws HyracksDataException {
@@ -639,4 +642,8 @@
     public boolean isTableEmpty() {
         return this.isTableEmpty;
     }
+    
+    public void setIsReversed(boolean b){
+    	this.isReversed = b;
+    }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 95b7a3c..4e9376d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,6 +19,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -117,6 +119,8 @@
     
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
+    
+    private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
 
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
@@ -139,8 +143,6 @@
         this.predEvaluatorFactory = predEvaluatorFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
-        
-
     }
 
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -207,7 +209,7 @@
     }
 
     public static class BuildAndPartitionTaskState extends AbstractStateObject {
-
+    	
         private int memForJoin;
         private int numOfPartitions;
         private OptimizedHybridHashJoin hybridHJ;
@@ -303,6 +305,7 @@
                 public void close() throws HyracksDataException {
                     state.hybridHJ.closeBuild();
                     ctx.setStateObject(state);
+                    LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
                 }
 
                 @Override
@@ -323,7 +326,7 @@
      * Hybrid Hash Join recursively on them.
      */
     private class ProbeAndJoinActivityNode extends AbstractActivityNode {
-
+    	
         private static final long serialVersionUID = 1L;
 
         private final ActivityId buildAid;
@@ -423,9 +426,11 @@
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
-
+                    
                     long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
                     long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+                    
+                    LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
 
                     //Apply in-Mem HJ if possible
                     if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
@@ -460,7 +465,7 @@
                     else {
                         OptimizedHybridHashJoin rHHj;
                         if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
-
+                        	LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
                                     nPartitions);
                            
@@ -503,6 +508,7 @@
                                 }
 
                             } else { //Switch to NLJ (Further recursion seems not to be useful)
+                            	LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -515,19 +521,21 @@
                                     int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                                     if (isLeftOuter || buildSideInTups < probeSideInTups) {
                                         applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
-                                                nljComparator0);
+                                                nljComparator0, false);
                                     } else {
                                         applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
-                                                nljComparator1);
+                                                nljComparator1, false);
                                     }
                                 }
                             }
                         } else { //Role Reversal (Probe Side is smaller)
+                        	LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
                                     nPartitions);
                             
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
                                     buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+                            rHHj.setIsReversed(true);	//Added to use predicateEvaluator (for inMemoryHashJoin) correctly
 
                             probeSideReader.open();
                             rHHj.initBuild();
@@ -561,7 +569,8 @@
                                     joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
                                 }
                             } else { //Switch to NLJ (Further recursion seems not to be effective)
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                            	LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+                            	for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
                                     
@@ -573,10 +582,10 @@
                                     long probeSideSize = rprfw.getFileSize();
                                     if (buildSideSize > probeSideSize) {
                                         applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
-                                                nljComparator1);
+                                                nljComparator1, true);
                                     } else {
                                         applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
-                                                nljComparator0);
+                                                nljComparator0, true);
                                     }
                                 }
                             }
@@ -590,7 +599,7 @@
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
                         ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
                         throws HyracksDataException {
-
+                	LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -619,9 +628,9 @@
                 }
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
-                        RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator)
+                        RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
                         throws HyracksDataException {
-
+                	
                     NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
                             new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);