Changes to fix memory leak in Join operators
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 979ef59..994b0ef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -139,16 +139,7 @@
boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i)
- + accessorInner.getTupleEndOffset(j) - accessorInner.getTupleStartOffset(j);
- throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
+ appendToResults(i, j, writer);
}
}
@@ -177,7 +168,35 @@
return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
}
}
-
+
+ private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException{
+ if(!isReversed){
+ if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+ int tSize = accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId)
+ + accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId);
+ throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
+ }
+ }
+ }
+ else{ //Role Reversal Optimization is triggered
+ if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+ int tSize = accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId)+
+ accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId);
+ throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
+ }
+ }
+ }
+
+ }
+
public void closeCache() throws HyracksDataException {
if (runFileWriter != null) {
runFileWriter.close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 4e9376d..3afd08b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -120,6 +119,12 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
+ //Flags added for test purpose
+ private static boolean skipInMemoryHJ = false;
+ private static boolean forceNLJ = false;
+ private static boolean forceRR = false;
+
+
private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -305,7 +310,7 @@
public void close() throws HyracksDataException {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
- LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
+ LOGGER.fine("OptimizedHybridHashJoin closed its build phase");
}
@Override
@@ -414,63 +419,60 @@
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
int beforeMax = (bSize > pSize) ? bSize : pSize;
- joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
-
+ joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
}
writer.close();
}
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
- RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
+ RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed) throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
- long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
- long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+ long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize());
+ long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize());
- LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
-
+ LOGGER.warning("\n>>>Joining Partition Pairs (pid "+pid+") - (level "+level+") - wasReversed "+wasReversed+" - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin)+" - LeftOuter is "+isLeftOuter);
+
//Apply in-Mem HJ if possible
- if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
- int tabSize = -1;
-
- if (isLeftOuter || buildPartSize < probePartSize) {
- tabSize = ohhj.getBuildPartitionSizeInTup(pid);
-
- if (tabSize == 0) {
- throw new HyracksDataException(
- "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ if(!skipInMemoryHJ){
+ if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin && !isLeftOuter)) {
+ int tabSize = -1;
+ if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize)) ) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+ LOGGER.warning("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "+level+"]");
+ tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj.getBuildPartitionSizeInTup(pid);
+ if (tabSize == 0) {
+ throw new HyracksDataException(
+ "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Build Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+ buildSideReader, probeSideReader, false, pid); //checked-confirmed
+ } else { //Case 1.2 - InMemHJ with Role Reversal
+ LOGGER.warning("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "+level+"]");
+ tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj.getProbePartitionSizeInTup(pid);
+ if (tabSize == 0) {
+ throw new HyracksDataException(
+ "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Probe Side is smaller
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+ probeSideReader, buildSideReader, true, pid); //checked-confirmed
}
- //Build Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
- buildSideReader, probeSideReader, false, pid);
-
- }
-
- else { //Role Reversal
- tabSize = ohhj.getProbePartitionSizeInTup(pid);
- if (tabSize == 0) {
- throw new HyracksDataException(
- "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
- }
- //Probe Side is smaller
-
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
- probeSideReader, buildSideReader, true, pid);
}
}
//Apply (Recursive) HHJ
else {
+ LOGGER.warning("\t>>>Case 2. ApplyRecursiveHHJ - [Level "+level+"]");
OptimizedHybridHashJoin rHHj;
- if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
- LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
+ if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+ LOGGER.warning("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
-
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
- probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
+ probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator); //checked-confirmed
buildSideReader.open();
rHHj.initBuild();
@@ -495,7 +497,8 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+ LOGGER.warning("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -504,12 +507,12 @@
continue;
}
- joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
+ joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
}
- } else { //Switch to NLJ (Further recursion seems not to be useful)
- LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ } else { //Case 2.1.2 - Switch to NLJ
+ LOGGER.warning("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -520,21 +523,21 @@
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (isLeftOuter || buildSideInTups < probeSideInTups) {
- applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
- nljComparator0, false);
- } else {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
- nljComparator1, false);
+ nljComparator0, false); //checked-modified
+ } else {
+ applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
+ nljComparator1, true); //checked-modified
}
}
}
- } else { //Role Reversal (Probe Side is smaller)
- LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
+ } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
+ LOGGER.warning("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
- buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+ buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator); //checked-confirmed
rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
probeSideReader.open();
@@ -557,7 +560,8 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+ LOGGER.warning("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -566,10 +570,10 @@
continue;
}
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
}
- } else { //Switch to NLJ (Further recursion seems not to be effective)
- LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+ } else { //Case 2.2.2 - Switch to NLJ
+ LOGGER.warning("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -582,10 +586,10 @@
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
- nljComparator1, true);
+ nljComparator0, true); //checked-modified
} else {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
- nljComparator0, true);
+ nljComparator1, true); //checked-modified
}
}
}
@@ -599,7 +603,6 @@
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
- LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -617,7 +620,7 @@
}
bReader.close();
rPartbuff.clear();
- // probe
+ // probe
pReader.open();
while (pReader.nextFrame(rPartbuff)) {
joiner.join(rPartbuff, writer);
@@ -630,10 +633,16 @@
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
throws HyracksDataException {
-
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+ /*
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
-
+ */
+ //changed to take care of LOJ with NLJ
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter, nullWriters1);
+ nlj.setIsReversed(reverse);
+
+
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
@@ -660,4 +669,16 @@
return op;
}
}
-}
+
+ public void setSkipInMemHJ(boolean b){
+ skipInMemoryHJ = b;
+ }
+
+ public void setForceNLJ(boolean b){
+ forceNLJ = b;
+ }
+
+ public void setForceRR(boolean b){
+ forceRR = (!isLeftOuter && b);
+ }
+}
\ No newline at end of file