Merge branch 'master' into pouria/fix-OptzHHJ
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index d02d65c..60e9c40 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -119,7 +119,7 @@
accessorBuild.reset(buffers.get(bIndex));
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- boolean predEval = ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, i, accessorBuild, tIndex) );
+ boolean predEval = evaluatePredicate(i, tIndex);
if(predEval){
matchFound = true;
appendToResult(i, tIndex, writer);
@@ -155,6 +155,15 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
+
+ private boolean evaluatePredicate(int tIx1, int tIx2){
+ if(reverseOutputOrder){ //Role Reversal Optimization is triggered
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1) );
+ }
+ else {
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2) );
+ }
+ }
private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
if (!reverseOutputOrder) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 2f719fa..979ef59 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -49,6 +49,8 @@
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuilder;
private final IPredicateEvaluator predEvaluator;
+ private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
@@ -63,6 +65,7 @@
this.outBuffers = new ArrayList<ByteBuffer>();
this.memSize = memSize;
this.predEvaluator = predEval;
+ this.isReversed = false;
this.ctx = ctx;
this.isLeftOuter = isLeftOuter;
@@ -133,7 +136,7 @@
boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
int c = compare(accessorOuter, i, accessorInner, j);
- boolean prdEval = (predEvaluator == null) || (predEvaluator.evaluate(accessorOuter, i, accessorInner, j));
+ boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
@@ -165,6 +168,15 @@
}
}
}
+
+ private boolean evaluatePredicate(int tIx1, int tIx2){
+ if(isReversed){ //Role Reversal Optimization is triggered
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1) );
+ }
+ else {
+ return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
+ }
+ }
public void closeCache() throws HyracksDataException {
if (runFileWriter != null) {
@@ -206,4 +218,8 @@
}
return 0;
}
+
+ public void setIsReversed(boolean b){
+ this.isReversed = b;
+ }
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index cd32c81..6bc810e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -99,6 +99,7 @@
private int freeFramesCounter; //Used for partition tuning
private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
+ private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -125,6 +126,7 @@
this.predEvaluator = predEval;
this.isLeftOuter = false;
this.nullWriters1 = null;
+ this.isReversed = false;
}
@@ -153,7 +155,8 @@
this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
-
+ this.isReversed = false;
+
this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -441,7 +444,7 @@
this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
- comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+ comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
}
private void cacheInMemJoin() throws HyracksDataException {
@@ -639,4 +642,8 @@
public boolean isTableEmpty() {
return this.isTableEmpty;
}
+
+ public void setIsReversed(boolean b){
+ this.isReversed = b;
+ }
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 95b7a3c..4e9376d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,6 +19,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -117,6 +119,8 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
+
+ private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
@@ -139,8 +143,6 @@
this.predEvaluatorFactory = predEvaluatorFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
-
-
}
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -207,7 +209,7 @@
}
public static class BuildAndPartitionTaskState extends AbstractStateObject {
-
+
private int memForJoin;
private int numOfPartitions;
private OptimizedHybridHashJoin hybridHJ;
@@ -303,6 +305,7 @@
public void close() throws HyracksDataException {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
+ LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
}
@Override
@@ -323,7 +326,7 @@
* Hybrid Hash Join recursively on them.
*/
private class ProbeAndJoinActivityNode extends AbstractActivityNode {
-
+
private static final long serialVersionUID = 1L;
private final ActivityId buildAid;
@@ -423,9 +426,11 @@
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
-
+
long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+
+ LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
//Apply in-Mem HJ if possible
if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
@@ -460,7 +465,7 @@
else {
OptimizedHybridHashJoin rHHj;
if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
-
+ LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
@@ -503,6 +508,7 @@
}
} else { //Switch to NLJ (Further recursion seems not to be useful)
+ LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -515,19 +521,21 @@
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (isLeftOuter || buildSideInTups < probeSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
- nljComparator0);
+ nljComparator0, false);
} else {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
- nljComparator1);
+ nljComparator1, false);
}
}
}
} else { //Role Reversal (Probe Side is smaller)
+ LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+ rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
probeSideReader.open();
rHHj.initBuild();
@@ -561,7 +569,8 @@
joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
}
} else { //Switch to NLJ (Further recursion seems not to be effective)
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -573,10 +582,10 @@
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
- nljComparator1);
+ nljComparator1, true);
} else {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
- nljComparator0);
+ nljComparator0, true);
}
}
}
@@ -590,7 +599,7 @@
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
-
+ LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -619,9 +628,9 @@
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
- RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator)
+ RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
throws HyracksDataException {
-
+
NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);