Addressing Review Comments for the Hybrid Hash Join
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@764 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 375f45e..8f2bb81 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -1,6 +1,5 @@
package edu.uci.ics.hyracks.dataflow.std.join;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
@@ -23,119 +22,118 @@
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+/**
+ * @author pouria
+ This class mainly applies one level of HHJ on a pair of
+ relations. It is always called by the descriptor.
+ */
public class OptimizedHybridHashJoin {
-
- private final int NO_MORE_FREE_BUFFER = -1;
+
+ private final int NO_MORE_FREE_BUFFER = -1;
private final int END_OF_PARTITION = -1;
private final int INVALID_BUFFER = -2;
private final int UNALLOCATED_FRAME = -3;
private final int BUFFER_FOR_RESIDENT_PARTS = -1;
-
- IHyracksTaskContext ctx;
-
- private final String rel0Name;
- private final String rel1Name;
-
- private final int[] buildKeys;
+
+ private IHyracksTaskContext ctx;
+
+ private final String rel0Name;
+ private final String rel1Name;
+
+ private final int[] buildKeys;
private final int[] probeKeys;
-
- final IBinaryComparator[] comparators;
-
- private ITuplePartitionComputer buildHpc;
- private ITuplePartitionComputer probeHpc;
-
- final RecordDescriptor buildRd;
- final RecordDescriptor probeRd;
-
- private RunFileWriter[] buildRFWriters; //writing spilled build partitions
- private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
+
+ private final IBinaryComparator[] comparators;
+
+ private ITuplePartitionComputer buildHpc;
+ private ITuplePartitionComputer probeHpc;
+
+ private final RecordDescriptor buildRd;
+ private final RecordDescriptor probeRd;
+
+ private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+ private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+
private final boolean isLeftOuter;
private final INullWriter[] nullWriters1;
-
- private ByteBuffer[] memBuffs; //Memory buffers for build
- private int[] curPBuff; //Current (last) Buffer for each partition
- private int[] nextBuff; //Next buffer in the partition's buffer chain
- private int[] buildPSizeInTups; //Size of build partitions (in tuples)
- private int[] probePSizeInTups; //Size of probe partitions (in tuples)
- private int nextFreeBuffIx; //Index of next available free buffer to allocate/use
- private BitSet pStatus; //0=resident, 1=spilled
- private int numOfPartitions;
+
+ private ByteBuffer[] memBuffs; //Memory buffers for build
+ private int[] curPBuff; //Current (last) Buffer for each partition
+ private int[] nextBuff; //Next buffer in the partition's buffer chain
+ private int[] buildPSizeInTups; //Size of build partitions (in tuples)
+ private int[] probePSizeInTups; //Size of probe partitions (in tuples)
+ private int nextFreeBuffIx; //Index of next available free buffer to allocate/use
+ private BitSet pStatus; //0=resident, 1=spilled
+ private int numOfPartitions;
private int memForJoin;
- private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
-
+ private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
+
private final FrameTupleAccessor accessorBuild;
- private final FrameTupleAccessor accessorProbe;
+ private final FrameTupleAccessor accessorProbe;
private FrameTupleAppender buildTupAppender;
private FrameTupleAppender probeTupAppenderToResident;
private FrameTupleAppender probeTupAppenderToSpilled;
-
+
private int numOfSpilledParts;
- private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
- private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
-
-
- private int[] buildPSizeInFrames; //Used for partition tuning
- private int freeFramesCounter; //Used for partition tuning
-
-
- public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin,
- int numOfPartitions, String rel0Name, String rel1Name,
- int[] keys0, int[] keys1, IBinaryComparator[] comparators,
- RecordDescriptor buildRd, RecordDescriptor probeRd,
- ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc){
- this.ctx = ctx;
- this.memForJoin = memForJoin;
- this.buildRd = buildRd;
- this.probeRd = probeRd;
- this.buildHpc = probeHpc;
- this.probeHpc = buildHpc;
- this.buildKeys = keys0;
- this.probeKeys = keys1;
- this.comparators = comparators;
- this.rel0Name = rel0Name;
- this.rel1Name = rel1Name;
+ private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
+ private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
+ private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
- this.numOfPartitions = numOfPartitions;
- this.buildRFWriters = new RunFileWriter[numOfPartitions];
- this.probeRFWriters = new RunFileWriter[numOfPartitions];
+ private int[] buildPSizeInFrames; //Used for partition tuning
+ private int freeFramesCounter; //Used for partition tuning
- this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
- this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+ public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
+ String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
+ RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc) {
+ this.ctx = ctx;
+ this.memForJoin = memForJoin;
+ this.buildRd = buildRd;
+ this.probeRd = probeRd;
+ this.buildHpc = probeHpc;
+ this.probeHpc = buildHpc;
+ this.buildKeys = keys0;
+ this.probeKeys = keys1;
+ this.comparators = comparators;
+ this.rel0Name = rel0Name;
+ this.rel1Name = rel1Name;
- this.isLeftOuter = false;
- this.nullWriters1 = null;
-
+ this.numOfPartitions = numOfPartitions;
+ this.buildRFWriters = new RunFileWriter[numOfPartitions];
+ this.probeRFWriters = new RunFileWriter[numOfPartitions];
+
+ this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
+ this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+
+ this.isLeftOuter = false;
+ this.nullWriters1 = null;
+
}
-
- public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin,
- int numOfPartitions, String rel0Name, String rel1Name,
- int[] keys0, int[] keys1, IBinaryComparator[] comparators,
- RecordDescriptor buildRd, RecordDescriptor probeRd,
- ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
- boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1){
- this.ctx = ctx;
- this.memForJoin = memForJoin;
- this.buildRd = buildRd;
- this.probeRd = probeRd;
- this.buildHpc = probeHpc;
- this.probeHpc = buildHpc;
- this.buildKeys = keys0;
- this.probeKeys = keys1;
- this.comparators = comparators;
- this.rel0Name = rel0Name;
- this.rel1Name = rel1Name;
- this.numOfPartitions = numOfPartitions;
- this.buildRFWriters = new RunFileWriter[numOfPartitions];
- this.probeRFWriters = new RunFileWriter[numOfPartitions];
+ public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
+ String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
+ RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
+ boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+ this.ctx = ctx;
+ this.memForJoin = memForJoin;
+ this.buildRd = buildRd;
+ this.probeRd = probeRd;
+ this.buildHpc = probeHpc;
+ this.probeHpc = buildHpc;
+ this.buildKeys = keys0;
+ this.probeKeys = keys1;
+ this.comparators = comparators;
+ this.rel0Name = rel0Name;
+ this.rel1Name = rel1Name;
- this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
- this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+ this.numOfPartitions = numOfPartitions;
+ this.buildRFWriters = new RunFileWriter[numOfPartitions];
+ this.probeRFWriters = new RunFileWriter[numOfPartitions];
- this.isLeftOuter = isLeftOuter;
-
-
+ this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
+ this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+
+ this.isLeftOuter = isLeftOuter;
+
this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -143,160 +141,148 @@
}
}
}
-
-
- public void initBuild(){
- memBuffs = new ByteBuffer[memForJoin];
+
+ public void initBuild() {
+ memBuffs = new ByteBuffer[memForJoin];
curPBuff = new int[numOfPartitions];
nextBuff = new int[memForJoin];
pStatus = new BitSet(numOfPartitions);
buildPSizeInTups = new int[numOfPartitions];
-
+
buildPSizeInFrames = new int[numOfPartitions];
freeFramesCounter = memForJoin - numOfPartitions;
-
- for(int i=0; i<numOfPartitions; i++){ //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
+
+ for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
memBuffs[i] = ctx.allocateFrame();
curPBuff[i] = i;
nextBuff[i] = -1;
- buildPSizeInFrames[i] = 1; //The dedicated initial buffer
+ buildPSizeInFrames[i] = 1; //The dedicated initial buffer
}
-
- nextFreeBuffIx = ( (numOfPartitions<memForJoin) ? numOfPartitions : -1); //Setting the chain of unallocated frames
- for(int i=numOfPartitions; i<memBuffs.length; i++){
- nextBuff[i] = UNALLOCATED_FRAME;
+
+ nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
+ for (int i = numOfPartitions; i < memBuffs.length; i++) {
+ nextBuff[i] = UNALLOCATED_FRAME;
}
-
+
buildTupAppender = new FrameTupleAppender(ctx.getFrameSize());
-
-
+
}
-
+
public void build(ByteBuffer buffer) throws HyracksDataException {
- accessorBuild.reset(buffer);
+ accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
processTuple(i, pid);
buildPSizeInTups[pid]++;
}
-
+
}
-
- private void processTuple(int tid, int pid) throws HyracksDataException{
- ByteBuffer partition = memBuffs[ curPBuff[pid] ]; //Getting current buffer for the target partition
-
- if(!pStatus.get(pid)){ //resident partition
- buildTupAppender.reset(partition, false);
+
+ private void processTuple(int tid, int pid) throws HyracksDataException {
+ ByteBuffer partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
+
+ if (!pStatus.get(pid)) { //resident partition
+ buildTupAppender.reset(partition, false);
while (true) {
- if (!buildTupAppender.append(accessorBuild, tid)) { //partition does not have enough room
- int newBuffIx = allocateFreeBuffer(pid);
- if(newBuffIx == NO_MORE_FREE_BUFFER){ //Spill one partition
- int pidToSpill = selectPartitionToSpill();
- if(pidToSpill != -1){
- spillPartition(pidToSpill);
- buildTupAppender.reset(memBuffs[pidToSpill], true);
- processTuple(tid, pid);
- break;
- }
- else{ //No more partition to spill
- throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
- }
- }
- else{ //New Buffer allocated successfully
- partition = memBuffs[ curPBuff[pid] ];
- buildTupAppender.reset(partition, true);
- if (!buildTupAppender.append(accessorBuild, tid)){
- throw new HyracksDataException("Invalid State (Can not append to newly allocated buffer)");
- }
- else{
- buildPSizeInFrames[pid]++;
- break;
- }
- }
- }
- else { //Tuple added to resident partition successfully
- break;
+ if (buildTupAppender.append(accessorBuild, tid)) { //Tuple added to resident partition successfully
+ break;
}
+ //partition does not have enough room
+ int newBuffIx = allocateFreeBuffer(pid);
+ if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
+ int pidToSpill = selectPartitionToSpill();
+ if (pidToSpill == -1) { //No more partition to spill
+ throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
+ }
+ spillPartition(pidToSpill);
+ buildTupAppender.reset(memBuffs[pidToSpill], true);
+ processTuple(tid, pid);
+ break;
+ } //New Buffer allocated successfully
+ partition = memBuffs[curPBuff[pid]]; //Current Buffer for the partition is now updated by allocateFreeBuffer() call above
+ buildTupAppender.reset(partition, true);
+ if (!buildTupAppender.append(accessorBuild, tid)) {
+ throw new HyracksDataException("Invalid State (Can not append to newly allocated buffer)");
+ }
+ buildPSizeInFrames[pid]++;
+ break;
}
- }
- else{ //spilled partition
+ } else { //spilled partition
boolean needClear = false;
while (true) {
- buildTupAppender.reset(partition, needClear);
+ buildTupAppender.reset(partition, needClear);
if (buildTupAppender.append(accessorBuild, tid)) {
break;
- } else {
- buildWrite(pid, partition);
- partition.clear();
- needClear = true;
- buildPSizeInFrames[pid]++;
}
+ //Dedicated in-memory buffer for the partition is full, needed to be flushed first
+ buildWrite(pid, partition);
+ partition.clear();
+ needClear = true;
+ buildPSizeInFrames[pid]++;
}
}
}
-
+
private int allocateFreeBuffer(int pid) {
- if(nextFreeBuffIx != -1){
- if( memBuffs[nextFreeBuffIx] == null){
+ if (nextFreeBuffIx != NO_MORE_FREE_BUFFER) {
+ if (memBuffs[nextFreeBuffIx] == null) {
memBuffs[nextFreeBuffIx] = ctx.allocateFrame();
}
int curPartBuffIx = curPBuff[pid];
curPBuff[pid] = nextFreeBuffIx;
int oldNext = nextBuff[nextFreeBuffIx];
nextBuff[nextFreeBuffIx] = curPartBuffIx;
- if(oldNext == UNALLOCATED_FRAME){
+ if (oldNext == UNALLOCATED_FRAME) {
nextFreeBuffIx++;
- if(nextFreeBuffIx == memForJoin){ //No more free buffer
+ if (nextFreeBuffIx == memForJoin) { //No more free buffer
nextFreeBuffIx = NO_MORE_FREE_BUFFER;
}
- }
- else{
+ } else {
nextFreeBuffIx = oldNext;
}
- (memBuffs[ curPBuff[pid] ]).clear();
-
+ (memBuffs[curPBuff[pid]]).clear();
+
freeFramesCounter--;
return (curPBuff[pid]);
- }
- else{
- return NO_MORE_FREE_BUFFER; //A partitions needs to be spilled (if feasible)
+ } else {
+ return NO_MORE_FREE_BUFFER; //A partitions needs to be spilled (if feasible)
}
}
-
- private int selectPartitionToSpill(){
+
+ private int selectPartitionToSpill() {
int maxSize = -1;
int partitionToSpill = -1;
- for(int i=0; i<buildPSizeInTups.length; i++){ //Find the largest partition, to spill
- if( !pStatus.get(i) && (buildPSizeInTups[i]>maxSize) ){
+ for (int i = 0; i < buildPSizeInTups.length; i++) { //Find the largest partition, to spill
+ if (!pStatus.get(i) && (buildPSizeInTups[i] > maxSize)) {
maxSize = buildPSizeInTups[i];
partitionToSpill = i;
}
}
return partitionToSpill;
}
-
- private void spillPartition(int pid) throws HyracksDataException{
- int curBuffIx = curPBuff[pid];
- ByteBuffer buff = null;
- while(curBuffIx != END_OF_PARTITION){
+
+ private void spillPartition(int pid) throws HyracksDataException {
+ int curBuffIx = curPBuff[pid];
+ ByteBuffer buff = null;
+ while (curBuffIx != END_OF_PARTITION) {
buff = memBuffs[curBuffIx];
buildWrite(pid, buff);
buff.clear();
-
+
int freedBuffIx = curBuffIx;
curBuffIx = nextBuff[curBuffIx];
-
- if(freedBuffIx != pid){
- nextBuff[freedBuffIx] = nextFreeBuffIx;
- nextFreeBuffIx = freedBuffIx;
- freeFramesCounter++;
+
+ if (freedBuffIx != pid) {
+ nextBuff[freedBuffIx] = nextFreeBuffIx;
+ nextFreeBuffIx = freedBuffIx;
+ freeFramesCounter++;
}
}
- curPBuff[pid] = pid;
- pStatus.set(pid);
+ curPBuff[pid] = pid;
+ pStatus.set(pid);
}
-
+
private void buildWrite(int pid, ByteBuffer buff) throws HyracksDataException {
RunFileWriter writer = buildRFWriters[pid];
if (writer == null) {
@@ -307,348 +293,315 @@
}
writer.nextFrame(buff);
}
-
- public void closeBuild() throws HyracksDataException{
- for(int i=0; i<numOfPartitions; i++){ //Remove Empty Partitions' allocated frame
- if(buildPSizeInTups[i] == 0){
- buildPSizeInFrames[i]--;
- nextBuff[ curPBuff[i] ] = nextFreeBuffIx;
- nextFreeBuffIx = curPBuff[i];
- curPBuff[i] = INVALID_BUFFER;
- freeFramesCounter++;
- }
- }
-
- ByteBuffer buff = null;
- for(int i=pStatus.nextSetBit(0); i>=0; i=pStatus.nextSetBit(i+1)){ //flushing and DeAllocating the dedicated buffers for the spilled partitions
- buff = memBuffs[i];
- accessorBuild.reset(buff);
- if (accessorBuild.getTupleCount() > 0) {
- buildWrite(i, buff);
- buildPSizeInFrames[i]++;
- }
- nextBuff[i] = nextFreeBuffIx;
- nextFreeBuffIx = i;
- freeFramesCounter++;
- curPBuff[i] = INVALID_BUFFER;
-
- if( buildRFWriters[i] != null){
- buildRFWriters[i].close();
- }
- }
-
- partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
-
- int inMemTupCount = 0;
- numOfSpilledParts = 0;
-
- for(int i=0; i<numOfPartitions; i++){
- if(!pStatus.get(i)){
- inMemTupCount += buildPSizeInTups[i];
- }
- else{
- numOfSpilledParts++;
- }
- }
-
-
- createInMemoryJoiner(inMemTupCount);
- cacheInMemJoin();
-
- int _stats_res_empty = 0;
- int _stats_res_full = 0;
- int _stats_spilled = 0;
- int _stats_tups_in_mem = 0;
- int _stats_sum_spilled_size = 0;
-
- for(int i=0; i<numOfPartitions; i++){
- if(!pStatus.get(i)){ //is resident
- if(buildPSizeInTups[i] == 0){ //is empty
- _stats_res_empty++;
- }
- else{
- _stats_res_full++;
- }
- _stats_tups_in_mem += buildPSizeInTups[i];
- }
- else{
- _stats_spilled++;
- _stats_sum_spilled_size += buildPSizeInTups[i];
- }
- }
+
+ public void closeBuild() throws HyracksDataException {
+ for (int i = 0; i < numOfPartitions; i++) { //Remove Empty Partitions' allocated frame
+ if (buildPSizeInTups[i] == 0) {
+ buildPSizeInFrames[i]--;
+ nextBuff[curPBuff[i]] = nextFreeBuffIx;
+ nextFreeBuffIx = curPBuff[i];
+ curPBuff[i] = INVALID_BUFFER;
+ freeFramesCounter++;
+ }
+ }
+
+ ByteBuffer buff = null;
+ for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
+ buff = memBuffs[i];
+ accessorBuild.reset(buff);
+ if (accessorBuild.getTupleCount() > 0) {
+ buildWrite(i, buff);
+ buildPSizeInFrames[i]++;
+ }
+ nextBuff[i] = nextFreeBuffIx;
+ nextFreeBuffIx = i;
+ freeFramesCounter++;
+ curPBuff[i] = INVALID_BUFFER;
+
+ if (buildRFWriters[i] != null) {
+ buildRFWriters[i].close();
+ }
+ }
+
+ partitionTune(); //Trying to bring back as many spilled partitions as possible, making them resident
+
+ int inMemTupCount = 0;
+ numOfSpilledParts = 0;
+
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (!pStatus.get(i)) {
+ inMemTupCount += buildPSizeInTups[i];
+ } else {
+ numOfSpilledParts++;
+ }
+ }
+
+ createInMemoryJoiner(inMemTupCount);
+ cacheInMemJoin();
}
-
+
private void partitionTune() throws HyracksDataException {
- ArrayList<Integer> reloadSet = selectPartitionsToReload();
-
- for(int i=0; i<reloadSet.size(); i++){
- int pid = reloadSet.get(i);
- if(buildPSizeInFrames[pid]>0){
- int[] buffsToLoad = new int[ buildPSizeInFrames[pid] ];
- for(int j=0; j<buffsToLoad.length; j++){
- buffsToLoad[j] = nextFreeBuffIx;
- int oldNext = nextBuff[nextFreeBuffIx];
- if(oldNext == UNALLOCATED_FRAME){
- nextFreeBuffIx++;
- if(nextFreeBuffIx == memForJoin){ //No more free buffer
- nextFreeBuffIx = NO_MORE_FREE_BUFFER;
- }
+ reloadBuffer = ctx.allocateFrame();
+ ArrayList<Integer> reloadSet = selectPartitionsToReload();
+ for (int i = 0; i < reloadSet.size(); i++) {
+ int pid = reloadSet.get(i);
+ int[] buffsToLoad = new int[buildPSizeInFrames[pid]];
+ for (int j = 0; j < buffsToLoad.length; j++) {
+ buffsToLoad[j] = nextFreeBuffIx;
+ int oldNext = nextBuff[nextFreeBuffIx];
+ if (oldNext == UNALLOCATED_FRAME) {
+ nextFreeBuffIx++;
+ if (nextFreeBuffIx == memForJoin) { //No more free buffer
+ nextFreeBuffIx = NO_MORE_FREE_BUFFER;
}
- else{
- nextFreeBuffIx = oldNext;
- }
-
-
- }
- curPBuff[pid] = buffsToLoad[0];
- for(int k=1; k<buffsToLoad.length; k++){
- nextBuff[ buffsToLoad[k-1] ] = buffsToLoad[k];
- }
- loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
- }
- }
- reloadSet.clear();
- reloadSet = null;
+ } else {
+ nextFreeBuffIx = oldNext;
+ }
+
+ }
+ curPBuff[pid] = buffsToLoad[0];
+ for (int k = 1; k < buffsToLoad.length; k++) {
+ nextBuff[buffsToLoad[k - 1]] = buffsToLoad[k];
+ }
+ loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
+ }
+ reloadSet.clear();
+ reloadSet = null;
}
-
+
private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
- RunFileReader r = wr.createReader();
+ RunFileReader r = wr.createReader();
r.open();
- ByteBuffer bf = ctx.allocateFrame();
int counter = 0;
ByteBuffer mBuff = null;
- while (r.nextFrame(bf)) {
- mBuff = memBuffs[ buffs[counter] ];
- if(mBuff == null){
- mBuff = ctx.allocateFrame();
- memBuffs[ buffs[counter] ] = mBuff;
- }
- FrameUtils.copy(bf, mBuff );
- counter++;
+ reloadBuffer.clear();
+ while (r.nextFrame(reloadBuffer)) {
+ mBuff = memBuffs[buffs[counter]];
+ if (mBuff == null) {
+ mBuff = ctx.allocateFrame();
+ memBuffs[buffs[counter]] = mBuff;
+ }
+ FrameUtils.copy(reloadBuffer, mBuff);
+ counter++;
+ reloadBuffer.clear();
}
-
- int curNext = nextBuff[ buffs[ buffs.length-1 ] ];
- nextBuff[ buffs[ buffs.length-1 ] ] = END_OF_PARTITION;
+
+ int curNext = nextBuff[buffs[buffs.length - 1]];
+ nextBuff[buffs[buffs.length - 1]] = END_OF_PARTITION;
nextFreeBuffIx = curNext;
-
+
r.close();
pStatus.set(pid, false);
- buildRFWriters[pid] = null;
+ buildRFWriters[pid] = null;
}
-
- private ArrayList<Integer> selectPartitionsToReload(){
- ArrayList<Integer> p = new ArrayList<Integer>();
- for(int i=pStatus.nextSetBit(0); i>=0; i=pStatus.nextSetBit(i+1)){
- if(freeFramesCounter - buildPSizeInFrames[i] >= 0){
- p.add(i);
- freeFramesCounter -= buildPSizeInFrames[i];
- }
- if(freeFramesCounter<1){ //No more free buffer available
- return p;
- }
- }
- return p;
+
+ private ArrayList<Integer> selectPartitionsToReload() {
+ ArrayList<Integer> p = new ArrayList<Integer>();
+ for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) {
+ if (buildPSizeInFrames[i]>0 && (freeFramesCounter - buildPSizeInFrames[i] >= 0) ) {
+ p.add(i);
+ freeFramesCounter -= buildPSizeInFrames[i];
+ }
+ if (freeFramesCounter < 1) { //No more free buffer available
+ return p;
+ }
+ }
+ return p;
}
-
- private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException{
- ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
- this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
- new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc,
- new FrameTupleAccessor(ctx.getFrameSize(), buildRd), buildHpc,
- new FrameTuplePairComparator(probeKeys, buildKeys, comparators),
- isLeftOuter, nullWriters1, table);
+
+ private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
+ ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
+ this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
+ new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
+ ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
+ comparators), isLeftOuter, nullWriters1, table);
}
-
- private void cacheInMemJoin() throws HyracksDataException{
-
- for(int pid=0; pid<numOfPartitions; pid++){
- if(!pStatus.get(pid) ){
- int nextBuffIx = curPBuff[pid];
- while(nextBuffIx > -1 ){ //It is not Invalid or End_Of_Partition
- inMemJoiner.build(memBuffs[nextBuffIx]);
+
+ private void cacheInMemJoin() throws HyracksDataException {
+
+ for (int pid = 0; pid < numOfPartitions; pid++) {
+ if (!pStatus.get(pid)) {
+ int nextBuffIx = curPBuff[pid];
+ while (nextBuffIx > -1) { //It is not Invalid or End_Of_Partition
+ inMemJoiner.build(memBuffs[nextBuffIx]);
nextBuffIx = nextBuff[nextBuffIx];
}
}
}
}
-
- public void initProbe(){
-
- sPartBuffs = new ByteBuffer[numOfSpilledParts];
- for(int i=0; i<numOfSpilledParts; i++){
- sPartBuffs[i] = ctx.allocateFrame();
- }
- curPBuff = new int[numOfPartitions];
- int nextBuffIxToAlloc = 0;
- for(int i=0; i<numOfPartitions; i++){
- curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
- }
- probePSizeInTups = new int[numOfPartitions];
- probeRFWriters = new RunFileWriter[numOfPartitions];
-
- probeResBuff = ctx.allocateFrame();
-
- probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
- probeTupAppenderToResident.reset(probeResBuff, true);
-
- probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
-
- }
-
- public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException{
-
- accessorProbe.reset(buffer);
- int tupleCount = accessorProbe.getTupleCount();
-
-
- if(numOfSpilledParts == 0){
- inMemJoiner.join(buffer, writer);
- return;
+
+ public void initProbe() {
+
+ sPartBuffs = new ByteBuffer[numOfSpilledParts];
+ for (int i = 0; i < numOfSpilledParts; i++) {
+ sPartBuffs[i] = ctx.allocateFrame();
}
-
+ curPBuff = new int[numOfPartitions];
+ int nextBuffIxToAlloc = 0;
+ /* We only need to allocate one frame per spilled partition.
+ * Resident partitions do not need frames in probe, as their tuples join
+ * immediately with the resident build tuples using the inMemoryHashJoin */
+ for (int i = 0; i < numOfPartitions; i++) {
+ curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
+ }
+ probePSizeInTups = new int[numOfPartitions];
+ probeRFWriters = new RunFileWriter[numOfPartitions];
+
+ probeResBuff = ctx.allocateFrame();
+
+ probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
+ probeTupAppenderToResident.reset(probeResBuff, true);
+
+ probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
+
+ }
+
+ public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+
+ accessorProbe.reset(buffer);
+ int tupleCount = accessorProbe.getTupleCount();
+
+ if (numOfSpilledParts == 0) {
+ inMemJoiner.join(buffer, writer);
+ return;
+ }
+
for (int i = 0; i < tupleCount; ++i) {
- int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
-
-
- if(buildPSizeInTups[pid]>0){ //Tuple has potential match from previous phase
- if( pStatus.get(pid) ){ //pid is Spilled
+ int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
+
+ if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
+ if (pStatus.get(pid)) { //pid is Spilled
boolean needToClear = false;
- ByteBuffer buff = sPartBuffs[ curPBuff[pid] ];
+ ByteBuffer buff = sPartBuffs[curPBuff[pid]];
while (true) {
- probeTupAppenderToSpilled.reset(buff, needToClear);
+ probeTupAppenderToSpilled.reset(buff, needToClear);
if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
break;
- } else {
- probeWrite(pid, buff);
- buff.clear();
- needToClear = true;
- }
+ }
+ probeWrite(pid, buff);
+ buff.clear();
+ needToClear = true;
}
- }
- else { //pid is Resident
+ } else { //pid is Resident
while (true) {
-
- if (!probeTupAppenderToResident.append(accessorProbe, i)) {
- inMemJoiner.join(probeResBuff, writer);
- probeTupAppenderToResident.reset(probeResBuff, true);
- } else{
- break;
+ if (probeTupAppenderToResident.append(accessorProbe, i)){
+ break;
}
- }
-
+ inMemJoiner.join(probeResBuff, writer);
+ probeTupAppenderToResident.reset(probeResBuff, true);
+ }
+
}
- probePSizeInTups[pid]++;
- }
-
+ probePSizeInTups[pid]++;
+ }
+
}
-
-
+
}
-
- public void closeProbe(IFrameWriter writer) throws HyracksDataException{ //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
- inMemJoiner.join(probeResBuff, writer);
- inMemJoiner.closeJoin(writer);
-
- for(int pid=pStatus.nextSetBit(0); pid>=0; pid=pStatus.nextSetBit(pid+1)){
- ByteBuffer buff = sPartBuffs[ curPBuff[pid] ];
+
+ public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+ inMemJoiner.join(probeResBuff, writer);
+ inMemJoiner.closeJoin(writer);
+
+ for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
+ ByteBuffer buff = sPartBuffs[curPBuff[pid]];
accessorProbe.reset(buff);
if (accessorProbe.getTupleCount() > 0) {
- probeWrite(pid, buff);
+ probeWrite(pid, buff);
}
closeProbeWriter(pid);
- }
+ }
}
-
-
+
private void probeWrite(int pid, ByteBuffer buff) throws HyracksDataException {
RunFileWriter pWriter = probeRFWriters[pid];
if (pWriter == null) {
- FileReference file = ctx.createManagedWorkspaceFile(rel1Name);
- pWriter = new RunFileWriter(file, ctx.getIOManager());
+ FileReference file = ctx.createManagedWorkspaceFile(rel1Name);
+ pWriter = new RunFileWriter(file, ctx.getIOManager());
pWriter.open();
probeRFWriters[pid] = pWriter;
}
pWriter.nextFrame(buff);
}
-
+
private void closeProbeWriter(int pid) throws HyracksDataException {
RunFileWriter writer = probeRFWriters[pid];
if (writer != null) {
writer.close();
}
}
-
- public RunFileReader getBuildRFReader(int pid) throws HyracksDataException{
- return ((buildRFWriters[pid]==null) ? null: (buildRFWriters[pid]).createReader());
+
+ public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
+ return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader());
}
-
- public long getBuildPartitionSize(int pid){
- return ((buildRFWriters[pid]==null) ? 0: buildRFWriters[pid].getFileSize() );
+
+ public long getBuildPartitionSize(int pid) {
+ return ((buildRFWriters[pid] == null) ? 0 : buildRFWriters[pid].getFileSize());
}
-
- public int getBuildPartitionSizeInTup(int pid){
- return (buildPSizeInTups[pid]);
+
+ public int getBuildPartitionSizeInTup(int pid) {
+ return (buildPSizeInTups[pid]);
}
-
- public RunFileReader getProbeRFReader(int pid) throws HyracksDataException{
- return ((probeRFWriters[pid]==null) ? null: (probeRFWriters[pid]).createReader());
+
+ public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
+ return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader());
}
-
- public long getProbePartitionSize(int pid){
- return ((probeRFWriters[pid]==null) ? 0: probeRFWriters[pid].getFileSize() );
+
+ public long getProbePartitionSize(int pid) {
+ return ((probeRFWriters[pid] == null) ? 0 : probeRFWriters[pid].getFileSize());
}
-
- public int getProbePartitionSizeInTup(int pid){
- return (probePSizeInTups[pid]);
+
+ public int getProbePartitionSizeInTup(int pid) {
+ return (probePSizeInTups[pid]);
}
-
- public int getMaxBuildPartitionSize(){
- int max = buildPSizeInTups[0];
- for(int i=1; i<buildPSizeInTups.length; i++){
- if( buildPSizeInTups[i] > max ){
- max = buildPSizeInTups[i];
- }
- }
- return max;
+
+ public int getMaxBuildPartitionSize() {
+ int max = buildPSizeInTups[0];
+ for (int i = 1; i < buildPSizeInTups.length; i++) {
+ if (buildPSizeInTups[i] > max) {
+ max = buildPSizeInTups[i];
+ }
+ }
+ return max;
}
-
- public int getMaxProbePartitionSize(){
- int max = probePSizeInTups[0];
- for(int i=1; i<probePSizeInTups.length; i++){
- if( probePSizeInTups[i] > max ){
- max = probePSizeInTups[i];
- }
- }
- return max;
+
+ public int getMaxProbePartitionSize() {
+ int max = probePSizeInTups[0];
+ for (int i = 1; i < probePSizeInTups.length; i++) {
+ if (probePSizeInTups[i] > max) {
+ max = probePSizeInTups[i];
+ }
+ }
+ return max;
}
-
- public BitSet getPartitinStatus(){
- return pStatus;
+
+ public BitSet getPartitinStatus() {
+ return pStatus;
}
-
- public String _stats_debug_getStats(){
- int numOfResidentPartitions = 0;
- int numOfSpilledPartitions = 0;
- double sumOfBuildSpilledSizes = 0;
- double sumOfProbeSpilledSizes = 0;
- int numOfInMemTups = 0;
- for(int i=0; i<numOfPartitions; i++){
- if(pStatus.get(i)){ //Spilled
- numOfSpilledPartitions++;
- sumOfBuildSpilledSizes += buildPSizeInTups[i];
- sumOfProbeSpilledSizes += probePSizeInTups[i];
- }
- else{ //Resident
- numOfResidentPartitions++;
- numOfInMemTups += buildPSizeInTups[i];
- }
- }
-
- double avgBuildSpSz = sumOfBuildSpilledSizes/numOfSpilledPartitions;
- double avgProbeSpSz = sumOfProbeSpilledSizes/numOfSpilledPartitions;
- String s = "Resident Partitions:\t"+numOfResidentPartitions+"\nSpilled Partitions:\t"+numOfSpilledPartitions+
- "\nAvg Build Spilled Size:\t"+avgBuildSpSz+"\nAvg Probe Spilled Size:\t"+avgProbeSpSz+
- "\nIn-Memory Tups:\t"+numOfInMemTups+"\nNum of Free Buffers:\t"+freeFramesCounter;
- return s;
+
+ public String _stats_debug_getStats() {
+ int numOfResidentPartitions = 0;
+ int numOfSpilledPartitions = 0;
+ double sumOfBuildSpilledSizes = 0;
+ double sumOfProbeSpilledSizes = 0;
+ int numOfInMemTups = 0;
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (pStatus.get(i)) { //Spilled
+ numOfSpilledPartitions++;
+ sumOfBuildSpilledSizes += buildPSizeInTups[i];
+ sumOfProbeSpilledSizes += probePSizeInTups[i];
+ } else { //Resident
+ numOfResidentPartitions++;
+ numOfInMemTups += buildPSizeInTups[i];
+ }
+ }
+
+ double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
+ double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
+ String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
+ + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
+ + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
+ + freeFramesCounter;
+ return s;
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index bcb46f5..e6a7035 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -40,42 +40,83 @@
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+/**
+ * @author pouria
+ * This class guides the joining process, and switches between different
+ joining techniques, w.r.t the implemented optimizations and skew in size of the
+ partitions.
+
+ - Operator overview:
+ Assume we are trying to do (R Join S), with M buffers available, while we have an estimate on the size
+ of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase
+ can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow:
+
+ * BUILD:
+ – Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
+ – Initialize the build phase (one frame per partition – all partitions considered resident at first)
+ – Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
+ its target partition and try to append it to that partition:
+ – If target partition's buffer is full, try to allocate a new buffer for it.
+ – if no free buffer is available, find the largest resident partition and spill it. Using its freed
+ buffers after spilling, allocate a new buffer for the target partition.
+ – Being done with R, close the build phase. (During closing we write the very last buffer of each
+ spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to
+ spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough
+ for reloading an entire partition back into memory)
+ – Create the hash table for the resident partitions (basically we create an in-memory hash join here)
+ * PROBE:
+ – Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
+ for the whole resident partitions)
+ – Read tuples of S, frame by frame and hash each tuple T to its target partition P
+ – if P is a resident partition, pass T to the in-memory hash join and generate the output record,
+ if any matching(s) record found
+ – if P is spilled, write T to the dedicated buffer for P (on the probe side)
+ – Once scanning of S is done, we try to join partition pairs (Ri, Si) of the spilled partitions:
+ – if any of Ri or Si is smaller than M, then we simply use an in-memory hash join to join them
+ – otherwise we apply HHJ recursively:
+ – if after applying HHJ recursively, we do not gain enough size reduction (max size of the
+ resulting partitions were more than 80% of the initial Ri,Si size) then we switch to
+ nested loop join for joining.
+ – (At each step of partition-pair joining, we consider role reversal, which means if size of Si were
+ greater than Ri, then we make sure that we switch the roles of build/probe between them)
+ */
public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-
- private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
+
+ private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
- private static final long serialVersionUID = 1L;
- private static final double NLJ_SWITCH_THRESHOLD = 0.8;
-
-
+
+ private static final long serialVersionUID = 1L;
+ private static final double NLJ_SWITCH_THRESHOLD = 0.8;
+
private static final String PROBE_REL = "RelR";
private static final String BUILD_REL = "RelS";
-
+
private final int memsize;
- private final int inputsize0;
- private final double factor;
+ private final int inputsize0;
+ private final double fudgeFactor;
private final int[] probeKeys;
private final int[] buildKeys;
private final IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories;
- private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
- private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
- private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
-
+ private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
+ private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
+ private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
+
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
-
- public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0,
- double factor, int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1,
- boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
-
+
+ public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, double factor,
+ int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+ ITuplePairComparatorFactory tupPaircomparatorFactory0,
+ ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
- this.factor = factor;
+ this.fudgeFactor = factor;
this.probeKeys = keys0;
this.buildKeys = keys1;
this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
@@ -85,18 +126,19 @@
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
-
+
}
-
- public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize,int inputsize0,
- double factor, int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
- throws HyracksDataException {
-
+
+ public OptimizedHybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, double factor,
+ int[] keys0, int[] keys1, IBinaryHashFunctionGeneratorFactory[] hashFunctionGeneratorFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
+ ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
+ throws HyracksDataException {
+
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
- this.factor = factor;
+ this.fudgeFactor = factor;
this.probeKeys = keys0;
this.buildKeys = keys1;
this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
@@ -107,12 +149,12 @@
this.isLeftOuter = false;
this.nullWriterFactories1 = null;
}
-
+
@Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
BUILD_AND_PARTITION_ACTIVITY_ID));
- ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
+ ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
PARTITION_AND_JOIN_ACTIVITY_ID));
builder.addActivity(phase1);
@@ -124,43 +166,36 @@
builder.addBlockingEdge(phase1, phase2);
builder.addTargetEdge(0, phase2, 0);
-
- }
-
-
- //memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
- private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions) throws HyracksDataException{
- int B = 0;
- if (memorySize > 1) {
- if (memorySize > buildSize) {
- return 1; //We will switch to in-Mem HJ eventually
- }
- else {
- B = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize)
- / (double) (memorySize - 1)));
- if (B <= 0) {
- B = 1; //becomes in-memory hash join
- }
- }
- if(B > memorySize){
- B = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
- return ( B<memorySize ? B : memorySize );
- }
- }
-
- else {
- throw new HyracksDataException("not enough memory for Hybrid Hash Join");
- }
- return B;
}
-
+
+ //memorySize is the memory for join (we have already excluded the 2 buffers for in/out)
+ private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
+ throws HyracksDataException {
+ int numberOfPartitions = 0;
+ if (memorySize <= 1) {
+ throw new HyracksDataException("not enough memory is available for Hybrid Hash Join");
+ }
+ if (memorySize > buildSize) {
+ return 1; //We will switch to in-Mem HJ eventually
+ }
+ numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize) / (double) (memorySize - 1)));
+ if (numberOfPartitions <= 0) {
+ numberOfPartitions = 1; //becomes in-memory hash join
+ }
+ if (numberOfPartitions > memorySize) {
+ numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
+ return (numberOfPartitions < memorySize ? numberOfPartitions : memorySize);
+ }
+ return numberOfPartitions;
+ }
+
public static class BuildAndPartitionTaskState extends AbstractTaskState {
-
- private int memForJoin;
- private int numOfPartitions;
- private OptimizedHybridHashJoin hybridHJ;
-
+
+ private int memForJoin;
+ private int numOfPartitions;
+ private OptimizedHybridHashJoin hybridHJ;
+
public BuildAndPartitionTaskState() {
}
@@ -180,388 +215,400 @@
}
-
+ /*
+ * Build phase of Hybrid Hash Join:
+ * Creating an instance of Hybrid Hash Join, using Shapiro's formula
+ * to get the optimal number of partitions, build relation is read and
+ * partitioned, and hybrid hash join instance gets ready for the probing.
+ * (See OptimizedHybridHashJoin for the details on different steps)
+ */
private class PartitionAndBuildActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
-
-
+
public PartitionAndBuildActivityNode(ActivityId id) {
super(id);
}
-
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-
-
-
+
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
-
+
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
+ for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
-
+
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
-
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
-
- ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys, hashFunctionGeneratorFactories).createPartitioner(0);
- ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys, hashFunctionGeneratorFactories).createPartitioner(0);
-
-
+ private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
+
+ ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys,
+ hashFunctionGeneratorFactories).createPartitioner(0);
+ ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys,
+ hashFunctionGeneratorFactories).createPartitioner(0);
+
@Override
public void open() throws HyracksDataException {
-
- if(memsize > 2){
- state.memForJoin = memsize - 2;
- state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, factor, nPartitions);
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
- PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators,
- probeRd, buildRd, probeHpc, buildHpc);
- state.hybridHJ.initBuild();
- }
- else{
- throw new HyracksDataException("not enough memory for Hybrid Hash Join");
- }
-
+ if (memsize <= 2){ //Dedicated buffers: One buffer to read and one buffer for output
+ throw new HyracksDataException("not enough memory for Hybrid Hash Join");
+ }
+ state.memForJoin = memsize - 2;
+ state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+ buildHpc);
+ state.hybridHJ.initBuild();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
state.hybridHJ.build(buffer);
}
-
+
@Override
public void close() throws HyracksDataException {
- state.hybridHJ.closeBuild();
- env.setTaskState(state);
+ state.hybridHJ.closeBuild();
+ env.setTaskState(state);
}
- @Override
- public void fail() throws HyracksDataException {
- }
-
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
};
return op;
}
}
-
+ /*
+ * Probe phase of Hybrid Hash Join:
+ * Reading the probe side and partitioning it, resident tuples get
+ * joined with the build side residents (through formerly created HybridHashJoin in the build phase)
+ * and spilled partitions get written to run files. During the close() call, pairs of spilled partition
+ * (build side spilled partition and its corresponding probe side spilled partition) join, by applying
+ * Hybrid Hash Join recursively on them.
+ */
private class ProbeAndJoinActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
- private int maxLevel = -1;
-
- public ProbeAndJoinActivityNode(ActivityId id){
- super(id);
- }
+ private static final long serialVersionUID = 1L;
+
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+ public ProbeAndJoinActivityNode(ActivityId id) {
+ super(id);
+ }
- final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator();
- final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator();
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
+ final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator();
+ final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator();
- final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
- if (isLeftOuter) {
- for (int i = 0; i < nullWriterFactories1.length; i++) {
- nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
- }
- }
+ for (int i = 0; i < comparatorFactories.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
- IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private BuildAndPartitionTaskState state;
- private ByteBuffer rPartbuff = ctx.allocateFrame();
-
- private ITuplePartitionComputerGeneratorFactory hpcf0 = new FieldHashPartitionComputerGeneratorFactory(probeKeys, hashFunctionGeneratorFactories);
- private ITuplePartitionComputerGeneratorFactory hpcf1 = new FieldHashPartitionComputerGeneratorFactory(buildKeys, hashFunctionGeneratorFactories);
-
- private ITuplePartitionComputer hpcRep0;
- private ITuplePartitionComputer hpcRep1;
-
- @Override
- public void open() throws HyracksDataException {
- state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private BuildAndPartitionTaskState state;
+ private ByteBuffer rPartbuff = ctx.allocateFrame();
+
+ private ITuplePartitionComputerGeneratorFactory hpcf0 = new FieldHashPartitionComputerGeneratorFactory(
+ probeKeys, hashFunctionGeneratorFactories);
+ private ITuplePartitionComputerGeneratorFactory hpcf1 = new FieldHashPartitionComputerGeneratorFactory(
+ buildKeys, hashFunctionGeneratorFactories);
+
+ private ITuplePartitionComputer hpcRep0;
+ private ITuplePartitionComputer hpcRep1;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-
- writer.open();
- state.hybridHJ.initProbe();
-
- }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.hybridHJ.probe(buffer, writer);
- }
+ writer.open();
+ state.hybridHJ.initProbe();
- @Override
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ state.hybridHJ.probe(buffer, writer);
+ }
+
+ @Override
public void fail() throws HyracksDataException {
writer.fail();
}
- @Override
- public void close() throws HyracksDataException {
+ @Override
+ public void close() throws HyracksDataException {
- state.hybridHJ.closeProbe(writer);
-
- BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
- hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0).createPartitioner(0);
- hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1).createPartitioner(0);
-
- rPartbuff.clear();
- for(int pid=partitionStatus.nextSetBit(0); pid>=0; pid=partitionStatus.nextSetBit(pid+1)){
+ state.hybridHJ.closeProbe(writer);
- RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
- RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
-
- if (bReader == null || pReader == null) {
- continue;
- }
- int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
- int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
- int beforeMax = (bSize>pSize) ? bSize : pSize;
- joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
-
- }
- writer.close();
- }
-
- private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader, RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException{
- ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys, hashFunctionGeneratorFactories).createPartitioner(level);
- ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys, hashFunctionGeneratorFactories).createPartitioner(level);
-
- if(level > maxLevel){
- maxLevel = level;
- }
-
- long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
- long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
-
- //Apply in-Mem HJ if possible
- if( (buildPartSize<state.memForJoin) || (probePartSize<state.memForJoin) ){
- int tabSize = -1;
- if(buildPartSize<probePartSize){
- tabSize = ohhj.getBuildPartitionSizeInTup(pid);
- if(tabSize>0){ //Build Side is smaller
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0, buildSideReader, probeSideReader);
- }
- }
- else{ //Role Reversal
- tabSize = ohhj.getProbePartitionSizeInTup(pid);
- if(tabSize>0){ //Probe Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1, probeSideReader, buildSideReader);
- }
- }
- }
- //Apply (Recursive) HHJ
- else {
- OptimizedHybridHashJoin rHHj;
- if(buildPartSize<probePartSize){ //Build Side is smaller
-
- int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, factor, nPartitions);
- rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin,
- n, PROBE_REL, BUILD_REL,
- probeKeys, buildKeys, comparators,
- probeRd, buildRd, probeHpc, buildHpc);
-
-
- buildSideReader.open();
- rHHj.initBuild();
- rPartbuff.clear();
- while (buildSideReader.nextFrame(rPartbuff)) {
- rHHj.build(rPartbuff);
- }
-
- rHHj.closeBuild();
-
- probeSideReader.open();
- rHHj.initProbe();
- rPartbuff.clear();
- while (probeSideReader.nextFrame(rPartbuff)) {
- rHHj.probe(rPartbuff, writer);
- }
- rHHj.closeProbe(writer);
-
- int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
- int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
- int afterMax = (maxAfterBuildSize>maxAfterProbeSize) ? maxAfterBuildSize : maxAfterProbeSize;
-
- BitSet rPStatus = rHHj.getPartitinStatus();
- if(afterMax < NLJ_SWITCH_THRESHOLD*beforeMax){
- for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
- if (rbrfw == null || rprfw == null) {
- continue;
- }
-
- joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level+1));
- }
-
- }
- else{ //Switch to NLJ (Further recursion seems not to be useful)
- for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
- if (rbrfw == null || rprfw == null) {
- continue;
- }
-
- int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
- int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
- if(buildSideInTups<probeSideInTups){
- applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw, nljComparator0);
- }
- else{
- applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw, nljComparator1);
- }
- }
- }
- }
- else{ //Role Reversal (Probe Side is smaller)
- int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, factor, nPartitions);
- rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin,
- n, BUILD_REL, PROBE_REL,
- buildKeys, probeKeys, comparators,
- buildRd, probeRd,buildHpc, probeHpc);
-
- probeSideReader.open();
- rHHj.initBuild();
- rPartbuff.clear();
- while (probeSideReader.nextFrame(rPartbuff)) {
- rHHj.build(rPartbuff);
- }
- rHHj.closeBuild();
- rHHj.initProbe();
- buildSideReader.open();
- rPartbuff.clear();
- while (buildSideReader.nextFrame(rPartbuff)) {
- rHHj.probe(rPartbuff, writer);
- }
- rHHj.closeProbe(writer);
- int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
- int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
- int afterMax = (maxAfterBuildSize>maxAfterProbeSize) ? maxAfterBuildSize : maxAfterProbeSize;
- BitSet rPStatus = rHHj.getPartitinStatus();
-
- if(afterMax < NLJ_SWITCH_THRESHOLD*beforeMax){
- for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
- if (rbrfw == null || rprfw == null) {
- continue;
- }
-
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level+1));
- }
- }
- else{ //Switch to NLJ (Further recursion seems not to be effective)
- for(int rPid=rPStatus.nextSetBit(0); rPid>=0; rPid=rPStatus.nextSetBit(rPid+1)){
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
- if (rbrfw == null || rprfw == null) {
- continue;
- }
-
- long buildSideSize = rbrfw.getFileSize();
- long probeSideSize = rprfw.getFileSize();
- if(buildSideSize>probeSideSize){
- applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw, nljComparator1);
- }
- else{
- applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw, nljComparator0);
- }
- }
- }
- }
- buildSideReader.close();
- probeSideReader.close();
- }
- }
-
- private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize,
- RecordDescriptor smallerRd, RecordDescriptor largerRd,
- ITuplePartitionComputer hpcRepLarger, ITuplePartitionComputer hpcRepSmaller,
- RunFileReader bReader, RunFileReader pReader) throws HyracksDataException{
-
- ISerializableTable table = new SerializableHashTable(tabSize, ctx);
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize,
- new FrameTupleAccessor(ctx.getFrameSize(), largerRd), hpcRepLarger,
- new FrameTupleAccessor(ctx.getFrameSize(), smallerRd), hpcRepSmaller,
- new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table);
+ BitSet partitionStatus = state.hybridHJ.getPartitinStatus();
+ hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
+ .createPartitioner(0);
+ hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
+ .createPartitioner(0);
- bReader.open();
- rPartbuff.clear();
- while (bReader.nextFrame(rPartbuff)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(rPartbuff, copyBuffer);
- FrameUtils.makeReadable(copyBuffer);
- joiner.build(copyBuffer);
- rPartbuff.clear();
- }
- bReader.close();
- rPartbuff.clear();
- // probe
- pReader.open();
- while (pReader.nextFrame(rPartbuff)) {
- joiner.join(rPartbuff, writer);
- rPartbuff.clear();
- }
- pReader.close();
- joiner.closeJoin(writer);
- }
+ rPartbuff.clear();
+ for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
- private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljCompaarator) throws HyracksDataException{
-
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd),
- nljCompaarator, memorySize);
+ RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
+ RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
+ if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+ continue;
+ }
+ int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
+ int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+ int beforeMax = (bSize > pSize) ? bSize : pSize;
+ joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
- ByteBuffer cacheBuff = ctx.allocateFrame();
- innerReader.open();
- while(innerReader.nextFrame(cacheBuff)){
- FrameUtils.makeReadable(cacheBuff);
- nlj.cache(cacheBuff);
- cacheBuff.clear();
- }
- nlj.closeCache();
-
- ByteBuffer joinBuff = ctx.allocateFrame();
- outerReader.open();
-
- while(outerReader.nextFrame(joinBuff)){
- FrameUtils.makeReadable(joinBuff);
- nlj.join(joinBuff, writer);
- joinBuff.clear();
- }
-
- nlj.closeJoin(writer);
- outerReader.close();
- innerReader.close();
- }
- };
- return op;
- }
+ }
+ writer.close();
+ }
+
+ private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
+ RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
+ ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerGeneratorFactory(probeKeys,
+ hashFunctionGeneratorFactories).createPartitioner(level);
+ ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerGeneratorFactory(buildKeys,
+ hashFunctionGeneratorFactories).createPartitioner(level);
+
+ long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
+ long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+
+ //Apply in-Mem HJ if possible
+ if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
+ int tabSize = -1;
+ if (buildPartSize < probePartSize) {
+ tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+ if(tabSize == 0){
+ throw new HyracksDataException("Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Build Side is smaller
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
+ buildSideReader, probeSideReader);
+
+ } else { //Role Reversal
+ tabSize = ohhj.getProbePartitionSizeInTup(pid);
+ if(tabSize == 0){
+ throw new HyracksDataException("Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Probe Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
+ probeSideReader, buildSideReader);
+ }
+ }
+ //Apply (Recursive) HHJ
+ else {
+ OptimizedHybridHashJoin rHHj;
+ if (buildPartSize < probePartSize) { //Build Side is smaller
+
+ int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor, nPartitions);
+ rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
+ probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
+
+ buildSideReader.open();
+ rHHj.initBuild();
+ rPartbuff.clear();
+ while (buildSideReader.nextFrame(rPartbuff)) {
+ rHHj.build(rPartbuff);
+ }
+
+ rHHj.closeBuild();
+
+ probeSideReader.open();
+ rHHj.initProbe();
+ rPartbuff.clear();
+ while (probeSideReader.nextFrame(rPartbuff)) {
+ rHHj.probe(rPartbuff, writer);
+ }
+ rHHj.closeProbe(writer);
+
+ int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+ int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+ int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
+ : maxAfterProbeSize;
+
+ BitSet rPStatus = rHHj.getPartitinStatus();
+ if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+ if (rbrfw == null || rprfw == null) {
+ continue;
+ }
+
+ joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
+ }
+
+ } else { //Switch to NLJ (Further recursion seems not to be useful)
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+ if (rbrfw == null || rprfw == null) {
+ continue;
+ }
+
+ int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
+ int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+ if (buildSideInTups < probeSideInTups) {
+ applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
+ nljComparator0);
+ } else {
+ applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
+ nljComparator1);
+ }
+ }
+ }
+ } else { //Role Reversal (Probe Side is smaller)
+ int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor, nPartitions);
+ rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
+ buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
+
+ probeSideReader.open();
+ rHHj.initBuild();
+ rPartbuff.clear();
+ while (probeSideReader.nextFrame(rPartbuff)) {
+ rHHj.build(rPartbuff);
+ }
+ rHHj.closeBuild();
+ rHHj.initProbe();
+ buildSideReader.open();
+ rPartbuff.clear();
+ while (buildSideReader.nextFrame(rPartbuff)) {
+ rHHj.probe(rPartbuff, writer);
+ }
+ rHHj.closeProbe(writer);
+ int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+ int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+ int afterMax = (maxAfterBuildSize > maxAfterProbeSize) ? maxAfterBuildSize
+ : maxAfterProbeSize;
+ BitSet rPStatus = rHHj.getPartitinStatus();
+
+ if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+ if (rbrfw == null || rprfw == null) {
+ continue;
+ }
+
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
+ }
+ } else { //Switch to NLJ (Further recursion seems not to be effective)
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+ if (rbrfw == null || rprfw == null) {
+ continue;
+ }
+
+ long buildSideSize = rbrfw.getFileSize();
+ long probeSideSize = rprfw.getFileSize();
+ if (buildSideSize > probeSideSize) {
+ applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
+ nljComparator1);
+ } else {
+ applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
+ nljComparator0);
+ }
+ }
+ }
+ }
+ buildSideReader.close();
+ probeSideReader.close();
+ }
+ }
+
+ private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
+ RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
+ ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+ throws HyracksDataException {
+
+ ISerializableTable table = new SerializableHashTable(tabSize, ctx);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
+ ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
+ buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
+ isLeftOuter, nullWriters1, table);
+
+ bReader.open();
+ rPartbuff.clear();
+ while (bReader.nextFrame(rPartbuff)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame(); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+ FrameUtils.copy(rPartbuff, copyBuffer);
+ FrameUtils.makeReadable(copyBuffer);
+ joiner.build(copyBuffer);
+ rPartbuff.clear();
+ }
+ bReader.close();
+ rPartbuff.clear();
+ // probe
+ pReader.open();
+ while (pReader.nextFrame(rPartbuff)) {
+ joiner.join(rPartbuff, writer);
+ rPartbuff.clear();
+ }
+ pReader.close();
+ joiner.closeJoin(writer);
+ }
+
+ private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
+ RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator)
+ throws HyracksDataException {
+
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize);
+
+ ByteBuffer cacheBuff = ctx.allocateFrame();
+ innerReader.open();
+ while (innerReader.nextFrame(cacheBuff)) {
+ FrameUtils.makeReadable(cacheBuff);
+ nlj.cache(cacheBuff);
+ cacheBuff.clear();
+ }
+ nlj.closeCache();
+
+ ByteBuffer joinBuff = ctx.allocateFrame();
+ outerReader.open();
+
+ while (outerReader.nextFrame(joinBuff)) {
+ FrameUtils.makeReadable(joinBuff);
+ nlj.join(joinBuff, writer);
+ joinBuff.clear();
+ }
+
+ nlj.closeJoin(writer);
+ outerReader.close();
+ innerReader.close();
+ }
+ };
+ return op;
+ }
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index 96a24b3..3820119 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -3,7 +3,7 @@
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
-import java.io.PrintStream;
+
import org.junit.Test;
@@ -68,7 +68,7 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/customer4.tbl"))) };
+ "data/tpch0.001/customer4.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -78,7 +78,7 @@
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/orders4.tbl"))) };
+ "data/tpch0.001/orders4.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
@@ -153,7 +153,7 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/customer3.tbl"))) };
+ "data/tpch0.001/customer3.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -163,14 +163,9 @@
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/orders4.tbl"))) };
+ "data/tpch0.001/orders4.tbl"))) };
- /*
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/orders4.tbl"))) };
- */
-
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -243,7 +238,7 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/customer3.tbl"))) };
+ "data/tpch0.001/customer3.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -253,13 +248,9 @@
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/orders1.tbl"))) };
+ "data/tpch0.001/orders1.tbl"))) };
- /*
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/tpch0.001/pouria/integration/orders4.tbl"))) };
- */
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {