Fix for Role Reversal Optimization in Optimized HHJ
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2933 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 3e5e30f..d86f1d5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -17,7 +17,6 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -47,11 +46,18 @@
private final ISerializableTable table;
private final int tableSize;
private final TuplePointer storedTuplePointer;
+ private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
throws HyracksDataException {
+ this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+ }
+
+ public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+ ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table, boolean reverse) throws HyracksDataException {
this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
@@ -76,6 +82,7 @@
} else {
nullTupleBuild = null;
}
+ reverseOutputOrder = reverse;
}
public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,18 +115,13 @@
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
matchFound = true;
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- throw new IllegalStateException();
- }
- }
+ appendToResult(i, tIndex, writer);
}
} while (true);
if (!matchFound && isLeftOuter) {
- if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
@@ -128,6 +130,7 @@
throw new IllegalStateException();
}
}
+
}
}
}
@@ -145,23 +148,25 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
-
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 8;
-
- long[] pointers;
- int size;
-
- Link() {
- pointers = new long[INIT_POINTERS_SIZE];
- size = 0;
- }
-
- void add(long pointer) {
- if (size >= pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
+
+ private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException{
+ if(!reverseOutputOrder){
+ if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+ throw new IllegalStateException();
+ }
}
- pointers[size++] = pointer;
- }
+ }
+ else{
+ if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 19479df..cf39416 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -135,6 +136,7 @@
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
+
}
@@ -254,6 +256,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
@@ -421,34 +424,40 @@
//Apply in-Mem HJ if possible
if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
int tabSize = -1;
- if (buildPartSize < probePartSize) {
+
+ if (isLeftOuter || buildPartSize < probePartSize) {
tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
- //Build Side is smaller
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
- buildSideReader, probeSideReader);
+ //Build Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+ buildSideReader, probeSideReader, false, pid);
- } else { //Role Reversal
+ }
+
+ else { //Role Reversal
tabSize = ohhj.getProbePartitionSizeInTup(pid);
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Probe Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
- probeSideReader, buildSideReader);
+
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+ probeSideReader, buildSideReader, true, pid);
}
}
//Apply (Recursive) HHJ
else {
OptimizedHybridHashJoin rHHj;
- if (buildPartSize < probePartSize) { //Build Side is smaller
+ if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
@@ -491,14 +500,14 @@
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
- if (buildSideInTups < probeSideInTups) {
+ if (isLeftOuter || buildSideInTups < probeSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
nljComparator0);
} else {
@@ -510,6 +519,7 @@
} else { //Role Reversal (Probe Side is smaller)
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
@@ -548,7 +558,7 @@
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
@@ -572,14 +582,14 @@
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
- ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+ ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
- isLeftOuter, nullWriters1, table);
+ isLeftOuter, nullWriters1, table, reverse);
bReader.open();
rPartbuff.clear();