Adding hash join logging comments.
commit 513c3a7899dc64af3c3cdec96fad9093a4ca2c5f
Merge: b27e9b5 82609d9
Author: Eldon Carman <ecarm002@ucr.edu>
Date: Thu Feb 5 12:47:52 2015 -0800
Adding hash join logging comments.
Change-Id: Iade2c53436e5ae82c31305d6f618c780cd72568b
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/219
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 52cbb52..ac45ac3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -16,6 +16,7 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -57,14 +58,22 @@
private final int aveRecordsPerFrame;
private final double fudgeFactor;
+ private static final Logger LOGGER = Logger.getLogger(HybridHashJoinPOperator.class.getName());
+
public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
- int memSizeInFrames, int maxInputSize0InFrames, int aveRecordsPerFrame, double fudgeFactor) {
+ int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
this.memSizeInFrames = memSizeInFrames;
- this.maxInputBuildSizeInFrames = maxInputSize0InFrames;
+ this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
this.aveRecordsPerFrame = aveRecordsPerFrame;
this.fudgeFactor = fudgeFactor;
+
+ LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+ + partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
+ + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int maxInputSize0InFrames="
+ + maxInputSizeInFrames + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor="
+ + fudgeFactor + ".");
}
@Override
@@ -108,10 +117,12 @@
Object t = env.getVarType(v);
comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
}
-
- IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
- IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
-
+
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context
+ .getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null
+ : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+
RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -141,7 +152,8 @@
}
opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true, nullWriterFactories);
+ hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true,
+ nullWriterFactories);
break;
}
default: {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8ec3669..961506b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -439,7 +439,8 @@
return;
}
case CLUSTER_SHUTDOWN: {
- workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this, new IPCResponder<Boolean>(handle,mid)));
+ workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this,
+ new IPCResponder<Boolean>(handle, mid)));
return;
}
}
@@ -625,10 +626,11 @@
deploymentRunMap.remove(deploymentKey);
}
- public synchronized void setShutdownRun(ShutdownRun sRun){
+ public synchronized void setShutdownRun(ShutdownRun sRun) {
shutdownCallback = sRun;
}
- public synchronized ShutdownRun getShutdownRun(){
+
+ public synchronized ShutdownRun getShutdownRun() {
return shutdownCallback;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 190fd28..7adc240 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -119,7 +119,7 @@
@Override
public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
- LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+ LOGGER.info("Reporting partition failure: JobId: " + jobId + " ResultSetId: " + rsId + " partition: "
+ partition);
ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
} catch (Exception e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 8887b82..910edc7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -501,13 +501,14 @@
} else {
tableSize = (int) (memsize * recordsPerFrame * factor);
}
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
RunFileWriter buildWriter = buildWriters[partitionid];
RunFileWriter probeWriter = probeWriters[partitionid];
if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
- ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+ table.reset();
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 86d738f..860cdd4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -18,6 +18,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,8 +34,8 @@
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
public class InMemoryHashJoin {
-
- private final IHyracksTaskContext ctx;
+
+ private final IHyracksTaskContext ctx;
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
@@ -51,11 +52,14 @@
private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
private final IPredicateEvaluator predEvaluator;
+ private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
+
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
- this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval, false);
+ this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval,
+ false);
}
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
@@ -63,7 +67,7 @@
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
this.ctx = ctx;
- this.tableSize = tableSize;
+ this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
buffers = new ArrayList<ByteBuffer>();
@@ -89,6 +93,8 @@
nullTupleBuild = null;
}
reverseOutputOrder = reverse;
+ LOGGER.fine("InMemoryHashJoin has been created for a table size of " + tableSize + " for Thread ID "
+ + Thread.currentThread().getId() + ".");
}
public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,9 +114,9 @@
accessorProbe.reset(buffer);
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
- boolean matchFound = false;
- if(tableSize != 0){
- int entry = tpcProbe.partition(accessorProbe, i, tableSize);
+ boolean matchFound = false;
+ if (tableSize != 0) {
+ int entry = tpcProbe.partition(accessorProbe, i, tableSize);
int offset = 0;
do {
table.getTuplePointer(entry, offset++, storedTuplePointer);
@@ -121,14 +127,14 @@
accessorBuild.reset(buffers.get(bIndex));
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- boolean predEval = evaluatePredicate(i, tIndex);
- if(predEval){
- matchFound = true;
+ boolean predEval = evaluatePredicate(i, tIndex);
+ if (predEval) {
+ matchFound = true;
appendToResult(i, tIndex, writer);
- }
+ }
}
} while (true);
- }
+ }
if (!matchFound && isLeftOuter) {
if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
@@ -151,6 +157,8 @@
int nFrames = buffers.size();
buffers.clear();
ctx.deallocateFrames(nFrames);
+ LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
+ + Thread.currentThread().getId() + ".");
}
private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
@@ -160,14 +168,13 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
-
- private boolean evaluatePredicate(int tIx1, int tIx2){
- if(reverseOutputOrder){ //Role Reversal Optimization is triggered
- return ( (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1) );
- }
- else {
- return ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2) );
- }
+
+ private boolean evaluatePredicate(int tIx1, int tIx2) {
+ if (reverseOutputOrder) { //Role Reversal Optimization is triggered
+ return ((predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1));
+ } else {
+ return ((predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2));
+ }
}
private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 6bc810e..506da2e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -68,7 +69,7 @@
private RunFileWriter[] buildRFWriters; //writing spilled build partitions
private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
+
private final IPredicateEvaluator predEvaluator;
private final boolean isLeftOuter;
private final INullWriter[] nullWriters1;
@@ -91,19 +92,22 @@
private FrameTupleAppender probeTupAppenderToSpilled;
private int numOfSpilledParts;
- private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
- private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
- private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
-
+ private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
+ private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
+ private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
+
private int[] buildPSizeInFrames; //Used for partition tuning
private int freeFramesCounter; //Used for partition tuning
-
- private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
- private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
-
+
+ private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
+ private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+
+ private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoin.class.getName());
+
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
- RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval) {
+ RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
+ IPredicateEvaluator predEval) {
this.ctx = ctx;
this.memForJoin = memForJoin;
this.buildRd = buildRd;
@@ -152,11 +156,11 @@
this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
-
+
this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
this.isReversed = false;
-
+
this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -292,6 +296,9 @@
}
private void spillPartition(int pid) throws HyracksDataException {
+ LOGGER.fine("OptimizedHybridHashJoin is spilling partition:" + pid + " with " + buildPSizeInFrames[pid]
+ + " frames for Thread ID " + Thread.currentThread().getId() + " (free frames: " + freeFramesCounter
+ + ").");
int curBuffIx = curPBuff[pid];
ByteBuffer buff = null;
while (curBuffIx != END_OF_PARTITION) {
@@ -310,12 +317,16 @@
}
curPBuff[pid] = pid;
pStatus.set(pid);
+ LOGGER.fine("OptimizedHybridHashJoin has freed " + freeFramesCounter + " frames by spilling partition:" + pid
+ + " for Thread ID " + Thread.currentThread().getId() + ".");
}
private void buildWrite(int pid, ByteBuffer buff) throws HyracksDataException {
RunFileWriter writer = buildRFWriters[pid];
if (writer == null) {
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(rel0Name);
+ LOGGER.fine("OptimizedHybridHashJoin is creating a run file (" + file.getFile().getAbsolutePath()
+ + ") for partition:" + pid + " for Thread ID " + Thread.currentThread().getId() + ".");
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
buildRFWriters[pid] = writer;
@@ -355,16 +366,23 @@
partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
int inMemTupCount = 0;
+ int inMemFrameCount = 0;
+ int spilledFrameCount = 0;
numOfSpilledParts = 0;
for (int i = 0; i < numOfPartitions; i++) {
if (!pStatus.get(i)) {
inMemTupCount += buildPSizeInTups[i];
+ inMemFrameCount += buildPSizeInFrames[i];
} else {
+ spilledFrameCount += buildPSizeInFrames[i];
numOfSpilledParts++;
}
}
+ LOGGER.fine("OptimizedHybridHashJoin build phase has spilled " + numOfSpilledParts + " of " + numOfPartitions
+ + " partitions for Thread ID " + Thread.currentThread().getId() + ". (" + inMemFrameCount
+ + " in-memory frames, " + spilledFrameCount + " spilled frames)");
createInMemoryJoiner(inMemTupCount);
cacheInMemJoin();
this.isTableEmpty = (inMemTupCount == 0);
@@ -499,14 +517,14 @@
inMemJoiner.join(buffer, writer);
return;
}
-
+ ByteBuffer buff = null;
for (int i = 0; i < tupleCount; ++i) {
int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
if (pStatus.get(pid)) { //pid is Spilled
boolean needToClear = false;
- ByteBuffer buff = sPartBuffs[curPBuff[pid]];
+ buff = sPartBuffs[curPBuff[pid]];
while (true) {
probeTupAppenderToSpilled.reset(buff, needToClear);
if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
@@ -537,8 +555,9 @@
inMemJoiner.join(probeResBuff, writer);
inMemJoiner.closeJoin(writer);
+ ByteBuffer buff = null;
for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
- ByteBuffer buff = sPartBuffs[curPBuff[pid]];
+ buff = sPartBuffs[curPBuff[pid]];
accessorProbe.reset(buff);
if (accessorProbe.getTupleCount() > 0) {
probeWrite(pid, buff);
@@ -609,7 +628,7 @@
return max;
}
- public BitSet getPartitinStatus() {
+ public BitSet getPartitionStatus() {
return pStatus;
}
@@ -642,8 +661,8 @@
public boolean isTableEmpty() {
return this.isTableEmpty;
}
-
- public void setIsReversed(boolean b){
- this.isReversed = b;
+
+ public void setIsReversed(boolean b) {
+ this.isReversed = b;
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 5cab373..540c31b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -298,6 +298,8 @@
}
state.hybridHJ.initBuild();
+ LOGGER.fine("OptimizedHybridHashJoin is starting the build phase with " + state.numOfPartitions
+ + " partitions using " + state.memForJoin + " frames for memory.");
}
@Override
@@ -384,6 +386,7 @@
writer.open();
state.hybridHJ.initProbe();
+ LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
}
@Override
@@ -398,10 +401,9 @@
@Override
public void close() throws HyracksDataException {
-
state.hybridHJ.closeProbe(writer);
- BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
+ BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
.createPartitioner(0);
hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
@@ -422,6 +424,7 @@
joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
}
writer.close();
+ LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
}
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
@@ -437,9 +440,10 @@
long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
.getProbePartitionSize(pid) / ctx.getFrameSize());
- LOGGER.fine("\n>>>Joining Partition Pairs (pid " + pid + ") - (level " + level + ") - wasReversed "
- + wasReversed + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
- + " - MemForJoin " + (state.memForJoin) + " - LeftOuter is " + isLeftOuter);
+ LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId()
+ + ") (pid " + pid + ") - (level " + level + ") - wasReversed " + wasReversed
+ + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin "
+ + (state.memForJoin) + " - LeftOuter is " + isLeftOuter);
//Apply in-Mem HJ if possible
if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
@@ -506,7 +510,7 @@
int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
: maxAfterProbeSize;
- BitSet rPStatus = rHHj.getPartitinStatus();
+ BitSet rPStatus = rHHj.getPartitionStatus();
if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
@@ -571,7 +575,7 @@
int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
: maxAfterProbeSize;
- BitSet rPStatus = rHHj.getPartitinStatus();
+ BitSet rPStatus = rHHj.getPartitionStatus();
if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "