Adding hash join logging comments.

commit 513c3a7899dc64af3c3cdec96fad9093a4ca2c5f
Merge: b27e9b5 82609d9
Author: Eldon Carman <ecarm002@ucr.edu>
Date:   Thu Feb 5 12:47:52 2015 -0800

    Adding hash join logging comments.

Change-Id: Iade2c53436e5ae82c31305d6f618c780cd72568b
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/219
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 52cbb52..ac45ac3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -16,6 +16,7 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -57,14 +58,22 @@
     private final int aveRecordsPerFrame;
     private final double fudgeFactor;
 
+    private static final Logger LOGGER = Logger.getLogger(HybridHashJoinPOperator.class.getName());
+
     public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
             List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
-            int memSizeInFrames, int maxInputSize0InFrames, int aveRecordsPerFrame, double fudgeFactor) {
+            int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
         this.memSizeInFrames = memSizeInFrames;
-        this.maxInputBuildSizeInFrames = maxInputSize0InFrames;
+        this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
         this.aveRecordsPerFrame = aveRecordsPerFrame;
         this.fudgeFactor = fudgeFactor;
+
+        LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+                + partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
+                + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int maxInputSize0InFrames="
+                + maxInputSizeInFrames + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor="
+                + fudgeFactor + ".");
     }
 
     @Override
@@ -108,10 +117,12 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
-        
-        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
-        IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
-        
+
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context
+                .getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null
+                : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -141,7 +152,8 @@
                         }
                         opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true, nullWriterFactories);
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true,
+                                nullWriterFactories);
                         break;
                     }
                     default: {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8ec3669..961506b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -439,7 +439,8 @@
                     return;
                 }
                 case CLUSTER_SHUTDOWN: {
-                    workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this, new IPCResponder<Boolean>(handle,mid)));
+                    workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this,
+                            new IPCResponder<Boolean>(handle, mid)));
                     return;
                 }
             }
@@ -625,10 +626,11 @@
         deploymentRunMap.remove(deploymentKey);
     }
 
-    public synchronized void setShutdownRun(ShutdownRun sRun){
+    public synchronized void setShutdownRun(ShutdownRun sRun) {
         shutdownCallback = sRun;
     }
-    public synchronized ShutdownRun getShutdownRun(){
+
+    public synchronized ShutdownRun getShutdownRun() {
         return shutdownCallback;
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 190fd28..7adc240 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -119,7 +119,7 @@
     @Override
     public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
-            LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+            LOGGER.info("Reporting partition failure: JobId: " + jobId + " ResultSetId: " + rsId + " partition: "
                     + partition);
             ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
         } catch (Exception e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 8887b82..910edc7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -501,13 +501,14 @@
                         } else {
                             tableSize = (int) (memsize * recordsPerFrame * factor);
                         }
+                        ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                         for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
                             RunFileWriter buildWriter = buildWriters[partitionid];
                             RunFileWriter probeWriter = probeWriters[partitionid];
                             if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
-                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                            table.reset();
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
                                     hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 86d738f..860cdd4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -18,6 +18,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,8 +34,8 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
 public class InMemoryHashJoin {
-	
-	private final IHyracksTaskContext ctx;
+
+    private final IHyracksTaskContext ctx;
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
@@ -51,11 +52,14 @@
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
 
+    private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
+
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
             ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
-        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval, false);
+        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval,
+                false);
     }
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
@@ -63,7 +67,7 @@
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
             ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
         this.ctx = ctx;
-    	this.tableSize = tableSize;
+        this.tableSize = tableSize;
         this.table = table;
         storedTuplePointer = new TuplePointer();
         buffers = new ArrayList<ByteBuffer>();
@@ -89,6 +93,8 @@
             nullTupleBuild = null;
         }
         reverseOutputOrder = reverse;
+        LOGGER.fine("InMemoryHashJoin has been created for a table size of " + tableSize + " for Thread ID "
+                + Thread.currentThread().getId() + ".");
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,9 +114,9 @@
         accessorProbe.reset(buffer);
         int tupleCount0 = accessorProbe.getTupleCount();
         for (int i = 0; i < tupleCount0; ++i) {
-        	boolean matchFound = false;
-        	if(tableSize != 0){
-        		int entry = tpcProbe.partition(accessorProbe, i, tableSize);
+            boolean matchFound = false;
+            if (tableSize != 0) {
+                int entry = tpcProbe.partition(accessorProbe, i, tableSize);
                 int offset = 0;
                 do {
                     table.getTuplePointer(entry, offset++, storedTuplePointer);
@@ -121,14 +127,14 @@
                     accessorBuild.reset(buffers.get(bIndex));
                     int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                     if (c == 0) {
-                    	boolean predEval = evaluatePredicate(i, tIndex);
-                    	if(predEval){
-                    		matchFound = true;
+                        boolean predEval = evaluatePredicate(i, tIndex);
+                        if (predEval) {
+                            matchFound = true;
                             appendToResult(i, tIndex, writer);
-                    	}
+                        }
                     }
                 } while (true);
-        	}
+            }
             if (!matchFound && isLeftOuter) {
                 if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
@@ -151,6 +157,8 @@
         int nFrames = buffers.size();
         buffers.clear();
         ctx.deallocateFrames(nFrames);
+        LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
+                + Thread.currentThread().getId() + ".");
     }
 
     private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
@@ -160,14 +168,13 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
-    
-    private boolean evaluatePredicate(int tIx1, int tIx2){
-    	if(reverseOutputOrder){		//Role Reversal Optimization is triggered
-    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1) );
-    	}
-    	else {
-    		return ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2) );
-    	}
+
+    private boolean evaluatePredicate(int tIx1, int tIx2) {
+        if (reverseOutputOrder) { //Role Reversal Optimization is triggered
+            return ((predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1));
+        } else {
+            return ((predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2));
+        }
     }
 
     private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 6bc810e..506da2e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -17,6 +17,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -68,7 +69,7 @@
 
     private RunFileWriter[] buildRFWriters; //writing spilled build partitions
     private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-    
+
     private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
     private final INullWriter[] nullWriters1;
@@ -91,19 +92,22 @@
     private FrameTupleAppender probeTupAppenderToSpilled;
 
     private int numOfSpilledParts;
-    private ByteBuffer[] sPartBuffs;    //Buffers for probe spilled partitions (one buffer per spilled partition)
-    private ByteBuffer probeResBuff;    //Buffer for probe resident partition tuples
-    private ByteBuffer reloadBuffer;    //Buffer for reloading spilled partitions during partition tuning 
-    
+    private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
+    private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
+    private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning 
+
     private int[] buildPSizeInFrames; //Used for partition tuning
     private int freeFramesCounter; //Used for partition tuning
-    
-    private boolean isTableEmpty;	//Added for handling the case, where build side is empty (tableSize is 0)
-    private boolean isReversed;		//Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
-    
+
+    private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
+    private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+
+    private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoin.class.getName());
+
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval) {
+            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
+            IPredicateEvaluator predEval) {
         this.ctx = ctx;
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
@@ -152,11 +156,11 @@
 
         this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
         this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
-        
+
         this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         this.isReversed = false;
-        
+
         this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
             for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -292,6 +296,9 @@
     }
 
     private void spillPartition(int pid) throws HyracksDataException {
+        LOGGER.fine("OptimizedHybridHashJoin is spilling partition:" + pid + " with " + buildPSizeInFrames[pid]
+                + " frames for Thread ID " + Thread.currentThread().getId() + " (free frames: " + freeFramesCounter
+                + ").");
         int curBuffIx = curPBuff[pid];
         ByteBuffer buff = null;
         while (curBuffIx != END_OF_PARTITION) {
@@ -310,12 +317,16 @@
         }
         curPBuff[pid] = pid;
         pStatus.set(pid);
+        LOGGER.fine("OptimizedHybridHashJoin has freed " + freeFramesCounter + " frames by spilling partition:" + pid
+                + " for Thread ID " + Thread.currentThread().getId() + ".");
     }
 
     private void buildWrite(int pid, ByteBuffer buff) throws HyracksDataException {
         RunFileWriter writer = buildRFWriters[pid];
         if (writer == null) {
             FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(rel0Name);
+            LOGGER.fine("OptimizedHybridHashJoin is creating a run file (" + file.getFile().getAbsolutePath()
+                    + ") for partition:" + pid + " for Thread ID " + Thread.currentThread().getId() + ".");
             writer = new RunFileWriter(file, ctx.getIOManager());
             writer.open();
             buildRFWriters[pid] = writer;
@@ -355,16 +366,23 @@
         partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
 
         int inMemTupCount = 0;
+        int inMemFrameCount = 0;
+        int spilledFrameCount = 0;
         numOfSpilledParts = 0;
 
         for (int i = 0; i < numOfPartitions; i++) {
             if (!pStatus.get(i)) {
                 inMemTupCount += buildPSizeInTups[i];
+                inMemFrameCount += buildPSizeInFrames[i];
             } else {
+                spilledFrameCount += buildPSizeInFrames[i];
                 numOfSpilledParts++;
             }
         }
 
+        LOGGER.fine("OptimizedHybridHashJoin build phase has spilled " + numOfSpilledParts + " of " + numOfPartitions
+                + " partitions for Thread ID " + Thread.currentThread().getId() + ". (" + inMemFrameCount
+                + " in-memory frames, " + spilledFrameCount + " spilled frames)");
         createInMemoryJoiner(inMemTupCount);
         cacheInMemJoin();
         this.isTableEmpty = (inMemTupCount == 0);
@@ -499,14 +517,14 @@
             inMemJoiner.join(buffer, writer);
             return;
         }
-
+        ByteBuffer buff = null;
         for (int i = 0; i < tupleCount; ++i) {
             int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
 
             if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
                 if (pStatus.get(pid)) { //pid is Spilled
                     boolean needToClear = false;
-                    ByteBuffer buff = sPartBuffs[curPBuff[pid]];
+                    buff = sPartBuffs[curPBuff[pid]];
                     while (true) {
                         probeTupAppenderToSpilled.reset(buff, needToClear);
                         if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
@@ -537,8 +555,9 @@
         inMemJoiner.join(probeResBuff, writer);
         inMemJoiner.closeJoin(writer);
 
+        ByteBuffer buff = null;
         for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
-            ByteBuffer buff = sPartBuffs[curPBuff[pid]];
+            buff = sPartBuffs[curPBuff[pid]];
             accessorProbe.reset(buff);
             if (accessorProbe.getTupleCount() > 0) {
                 probeWrite(pid, buff);
@@ -609,7 +628,7 @@
         return max;
     }
 
-    public BitSet getPartitinStatus() {
+    public BitSet getPartitionStatus() {
         return pStatus;
     }
 
@@ -642,8 +661,8 @@
     public boolean isTableEmpty() {
         return this.isTableEmpty;
     }
-    
-    public void setIsReversed(boolean b){
-    	this.isReversed = b;
+
+    public void setIsReversed(boolean b) {
+        this.isReversed = b;
     }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 5cab373..540c31b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -298,6 +298,8 @@
                     }
 
                     state.hybridHJ.initBuild();
+                    LOGGER.fine("OptimizedHybridHashJoin is starting the build phase with " + state.numOfPartitions
+                            + " partitions using " + state.memForJoin + " frames for memory.");
                 }
 
                 @Override
@@ -384,6 +386,7 @@
                     writer.open();
                     state.hybridHJ.initProbe();
 
+                    LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
                 }
 
                 @Override
@@ -398,10 +401,9 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-
                     state.hybridHJ.closeProbe(writer);
 
-                    BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
+                    BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
                     hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
                             .createPartitioner(0);
                     hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
@@ -422,6 +424,7 @@
                         joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
                     }
                     writer.close();
+                    LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
                 }
 
                 private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
@@ -437,9 +440,10 @@
                     long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
                             .getProbePartitionSize(pid) / ctx.getFrameSize());
 
-                    LOGGER.fine("\n>>>Joining Partition Pairs (pid " + pid + ") - (level " + level + ") - wasReversed "
-                            + wasReversed + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
-                            + " - MemForJoin " + (state.memForJoin) + "  - LeftOuter is " + isLeftOuter);
+                    LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId()
+                            + ") (pid " + pid + ") - (level " + level + ") - wasReversed " + wasReversed
+                            + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin "
+                            + (state.memForJoin) + "  - LeftOuter is " + isLeftOuter);
 
                     //Apply in-Mem HJ if possible
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
@@ -506,7 +510,7 @@
                             int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
                                     : maxAfterProbeSize;
 
-                            BitSet rPStatus = rHHj.getPartitinStatus();
+                            BitSet rPStatus = rHHj.getPartitionStatus();
                             if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
                                 LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                         + level + "]");
@@ -571,7 +575,7 @@
                             int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
                             int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
                                     : maxAfterProbeSize;
-                            BitSet rPStatus = rHHj.getPartitinStatus();
+                            BitSet rPStatus = rHHj.getPartitionStatus();
 
                             if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
                                 LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "