[ASTERIXDB-2577][RT] One frame per spilled partition in optimized hybrid hash join
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Makes sure there is one frame for each spilled partition during the build close.
Checks the length of large record before flushing a record to the disk blindly
as being large.
Change-Id: I82d4da57e9a9835cab61dc5cc43c45c728e0c2b6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3485
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4578c2e..731e7ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -168,7 +168,7 @@
numTuples[partition]--;
}
- private static int calculateActualSize(int[] fieldEndOffsets, int size) {
+ public static int calculateActualSize(int[] fieldEndOffsets, int size) {
if (fieldEndOffsets != null) {
return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
}
@@ -204,7 +204,7 @@
private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
throws HyracksDataException {
- assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
+ assert bufferInfo.getStartOffset() == 0 : "Haven't supported yet in FrameTupleAppender";
if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
appendFrame.reset(bufferInfo.getBuffer());
appender.reset(appendFrame, false);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..7b6dcdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -62,46 +62,37 @@
PROBE
}
- private IHyracksTaskContext ctx;
+ private final IHyracksTaskContext ctx;
private final String buildRelName;
private final String probeRelName;
-
private final ITuplePairComparator comparator;
private final ITuplePartitionComputer buildHpc;
private final ITuplePartitionComputer probeHpc;
-
private final RecordDescriptor buildRd;
private final RecordDescriptor probeRd;
-
- private RunFileWriter[] buildRFWriters; //writing spilled build partitions
- private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
+ private final RunFileWriter[] buildRFWriters; //writing spilled build partitions
+ private final RunFileWriter[] probeRFWriters; //writing spilled probe partitions
private final IPredicateEvaluator predEvaluator;
private final boolean isLeftOuter;
private final IMissingWriter[] nonMatchWriters;
-
private final BitSet spilledStatus; //0=resident, 1=spilled
private final int numOfPartitions;
private final int memSizeInFrames;
private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
-
private IPartitionedTupleBufferManager bufferManager;
private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
-
private final FrameTupleAccessor accessorBuild;
private final FrameTupleAccessor accessorProbe;
-
- private IDeallocatableFramePool framePool;
private ISimpleFrameBufferManager bufferManagerForHashTable;
-
- private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
-
+ // Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal.
+ private boolean isReversed;
// stats information
private int[] buildPSizeInTups;
private IFrame reloadBuffer;
- private TuplePointer tempPtr = new TuplePointer(); // this is a reusable object to store the pointer,which is not used anywhere.
- // we mainly use it to match the corresponding function signature.
+ // this is a reusable object to store the pointer,which is not used anywhere. we mainly use it to match the
+ // corresponding function signature.
+ private final TuplePointer tempPtr = new TuplePointer();
private int[] probePSizeInTups;
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
@@ -117,20 +108,15 @@
this.comparator = comparator;
this.buildRelName = buildRelName;
this.probeRelName = probeRelName;
-
this.numOfPartitions = numOfPartitions;
this.buildRFWriters = new RunFileWriter[numOfPartitions];
this.probeRFWriters = new RunFileWriter[numOfPartitions];
-
this.accessorBuild = new FrameTupleAccessor(buildRd);
this.accessorProbe = new FrameTupleAccessor(probeRd);
-
this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
this.isReversed = false;
-
this.spilledStatus = new BitSet(numOfPartitions);
-
this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -140,7 +126,8 @@
}
public void initBuild() throws HyracksDataException {
- framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+ IDeallocatableFramePool framePool =
+ new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
bufferManager = new VPartitionTupleBufferManager(
PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
@@ -153,32 +140,27 @@
public void build(ByteBuffer buffer) throws HyracksDataException {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
-
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
- processTuple(i, pid);
+ processTupleBuildPhase(i, pid);
buildPSizeInTups[pid]++;
}
}
- private void processTuple(int tid, int pid) throws HyracksDataException {
+ private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- selectAndSpillVictim(pid);
+ int victimPartition = spillPolicy.selectVictimPartition(pid);
+ if (victimPartition < 0) {
+ throw new HyracksDataException(
+ "No more space left in the memory buffer, please assign more memory to hash-join.");
+ }
+ spillPartition(victimPartition);
}
}
- private void selectAndSpillVictim(int pid) throws HyracksDataException {
- int victimPartition = spillPolicy.selectVictimPartition(pid);
- if (victimPartition < 0) {
- throw new HyracksDataException(
- "No more space left in the memory buffer, please assign more memory to hash-join.");
- }
- spillPartition(victimPartition);
- }
-
private void spillPartition(int pid) throws HyracksDataException {
- RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+ RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(buildRFWriters, buildRelName, pid);
bufferManager.flushPartition(pid, writer);
bufferManager.clearPartition(pid);
spilledStatus.set(pid);
@@ -191,19 +173,8 @@
buildRFWriters[pid].close();
}
- private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- String refName = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- refName = buildRelName;
- break;
- case PROBE:
- refName = probeRelName;
- runFileWriters = probeRFWriters;
- break;
- }
+ private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(RunFileWriter[] runFileWriters, String refName,
+ int pid) throws HyracksDataException {
RunFileWriter writer = runFileWriters[pid];
if (writer == null) {
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
@@ -216,54 +187,55 @@
public void closeBuild() throws HyracksDataException {
// Flushes the remaining chunks of the all spilled partitions to the disk.
- closeAllSpilledPartitions(SIDE.BUILD);
+ closeAllSpilledPartitions(buildRFWriters, buildRelName);
// Makes the space for the in-memory hash table (some partitions may need to be spilled to the disk
// during this step in order to make the space.)
// and tries to bring back as many spilled partitions as possible if there is free space.
int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
- createInMemoryJoiner(inMemTupCount);
+ ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
+ this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
+ new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
+ predEvaluator, isReversed, bufferManagerForHashTable);
- loadDataInMemJoin();
+ buildHashTable();
+ }
+
+ public void clearBuildTempFiles() throws HyracksDataException {
+ clearTempFiles(buildRFWriters);
+ }
+
+ public void clearProbeTempFiles() throws HyracksDataException {
+ clearTempFiles(probeRFWriters);
}
/**
* In case of failure happens, we need to clear up the generated temporary files.
*/
- public void clearBuildTempFiles() throws HyracksDataException {
- for (int i = 0; i < buildRFWriters.length; i++) {
- if (buildRFWriters[i] != null) {
- buildRFWriters[i].erase();
+ private void clearTempFiles(RunFileWriter[] runFileWriters) throws HyracksDataException {
+ for (int i = 0; i < runFileWriters.length; i++) {
+ if (runFileWriters[i] != null) {
+ runFileWriters[i].erase();
}
}
}
- private void closeAllSpilledPartitions(SIDE whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- break;
- case PROBE:
- runFileWriters = probeRFWriters;
- break;
- }
+ private void closeAllSpilledPartitions(RunFileWriter[] runFileWriters, String refName) throws HyracksDataException {
try {
for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
spilledStatus.nextSetBit(pid + 1)) {
if (bufferManager.getNumTuples(pid) > 0) {
- bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
+ bufferManager.flushPartition(pid,
+ getSpillWriterOrCreateNewOneIfNotExist(runFileWriters, refName, pid));
bufferManager.clearPartition(pid);
}
}
} finally {
// Force to close all run file writers.
- if (runFileWriters != null) {
- for (RunFileWriter runFileWriter : runFileWriters) {
- if (runFileWriter != null) {
- runFileWriter.close();
- }
+ for (RunFileWriter runFileWriter : runFileWriters) {
+ if (runFileWriter != null) {
+ runFileWriter.close();
}
}
}
@@ -278,93 +250,68 @@
* @throws HyracksDataException
*/
private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
- // we need number of |spilledPartitions| buffers to store the probe data
int frameSize = ctx.getInitialFrameSize();
long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
- // For partitions in main memory, we deduct their size from the free space.
int inMemTupCount = 0;
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
freeSpace -= bufferManager.getPhysicalSize(p);
inMemTupCount += buildPSizeInTups[p];
}
+ freeSpace -= SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
- // Calculates the expected hash table size for the given number of tuples in main memory
- // and deducts it from the free space.
- long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
- freeSpace -= hashTableByteSizeForInMemTuples;
+ return spillAndReloadPartitions(frameSize, freeSpace, inMemTupCount);
+ }
- // In the case where free space is less than zero after considering the hash table size,
- // we need to spill more partitions until we can accommodate the hash table in memory.
- // TODO: there may be different policies (keep spilling minimum, spilling maximum, find a similar size to the
- // hash table, or keep spilling from the first partition)
- boolean moreSpilled = false;
-
- // No space to accommodate the hash table? Then, we spill one or more partitions to the disk.
- if (freeSpace < 0) {
- // Tries to find a best-fit partition not to spill many partitions.
- int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
+ private int spillAndReloadPartitions(int frameSize, long freeSpace, int inMemTupCount) throws HyracksDataException {
+ int pidToSpill, numberOfTuplesToBeSpilled;
+ long expectedHashTableSizeDecrease;
+ long currentFreeSpace = freeSpace;
+ int currentInMemTupCount = inMemTupCount;
+ // Spill some partitions if there is no free space.
+ while (currentFreeSpace < 0) {
+ pidToSpill = selectSinglePartitionToSpill(currentFreeSpace, currentInMemTupCount, frameSize);
if (pidToSpill >= 0) {
- // There is a suitable one. We spill that partition to the disk.
- long hashTableSizeDecrease =
- -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
- -buildPSizeInTups[pidToSpill], frameSize);
- freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
- inMemTupCount -= buildPSizeInTups[pidToSpill];
+ numberOfTuplesToBeSpilled = buildPSizeInTups[pidToSpill];
+ expectedHashTableSizeDecrease =
+ -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
+ -numberOfTuplesToBeSpilled, frameSize);
+ currentInMemTupCount -= numberOfTuplesToBeSpilled;
+ currentFreeSpace +=
+ bufferManager.getPhysicalSize(pidToSpill) + expectedHashTableSizeDecrease - frameSize;
spillPartition(pidToSpill);
closeBuildPartition(pidToSpill);
- moreSpilled = true;
} else {
- // There is no single suitable partition. So, we need to spill multiple partitions to the disk
- // in order to accommodate the hash table.
- for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
- spilledStatus.nextClearBit(p + 1)) {
- int spaceToBeReturned = bufferManager.getPhysicalSize(p);
- int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
- if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
- continue;
- }
- spillPartition(p);
- closeBuildPartition(p);
- moreSpilled = true;
- // Since the number of tuples in memory has been decreased,
- // the hash table size will be decreased, too.
- // We put minus since the method returns a negative value to represent a newly reclaimed space.
- long expectedHashTableSizeDecrease =
- -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
- -numberOfTuplesToBeSpilled, frameSize);
- freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
- // Adjusts the hash table size
- inMemTupCount -= numberOfTuplesToBeSpilled;
- if (freeSpace >= 0) {
- break;
- }
- }
+ throw new HyracksDataException("Hash join does not have enough memory even after spilling.");
}
}
+ // Bring some partitions back in if there is enough space.
+ return bringPartitionsBack(currentFreeSpace, currentInMemTupCount, frameSize);
+ }
- // If more partitions have been spilled to the disk, calculate the expected hash table size again
- // before bringing some partitions to main memory.
- if (moreSpilled) {
- hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
- }
-
- // Brings back some partitions if there is enough free space.
+ /**
+ * Brings back some partitions if there is free memory and partitions that fit in that space.
+ *
+ * @param freeSpace current amount of free space in memory
+ * @param inMemTupCount number of in memory tuples
+ * @return number of in memory tuples after bringing some (or none) partitions in memory.
+ * @throws HyracksDataException
+ */
+ private int bringPartitionsBack(long freeSpace, int inMemTupCount, int frameSize) throws HyracksDataException {
int pid = 0;
- while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) >= 0) {
- if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
- break;
- }
- long expectedHashTableByteSizeIncrease = SerializableHashTable
- .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize);
- freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - expectedHashTableByteSizeIncrease;
- inMemTupCount += buildPSizeInTups[pid];
- // Adjusts the hash table size
- hashTableByteSizeForInMemTuples += expectedHashTableByteSizeIncrease;
+ int currentMemoryTupleCount = inMemTupCount;
+ long currentFreeSpace = freeSpace;
+ while ((pid = selectAPartitionToReload(currentFreeSpace, pid, currentMemoryTupleCount)) >= 0
+ && loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+ currentMemoryTupleCount += buildPSizeInTups[pid];
+ // Reserve space for loaded data & increase in hash table (give back one frame taken by spilled partition.)
+ currentFreeSpace = currentFreeSpace
+ - bufferManager.getPhysicalSize(pid) - SerializableHashTable
+ .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize)
+ + frameSize;
}
-
- return inMemTupCount;
+ return currentMemoryTupleCount;
}
/**
@@ -376,14 +323,18 @@
long spaceAfterSpill;
long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
int minSpaceAfterSpillPartID = -1;
-
+ int nextAvailablePidToSpill = -1;
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
continue;
}
+ if (nextAvailablePidToSpill < 0) {
+ nextAvailablePidToSpill = p;
+ }
// We put minus since the method returns a negative value to represent a newly reclaimed space.
- spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) + (-SerializableHashTable
+ // One frame is deducted since a spilled partition needs one frame from free space.
+ spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) - frameSize + (-SerializableHashTable
.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, -buildPSizeInTups[p], frameSize));
if (spaceAfterSpill == 0) {
// Found the perfect one. Just returns this partition.
@@ -394,18 +345,30 @@
minSpaceAfterSpillPartID = p;
}
}
- return minSpaceAfterSpillPartID;
+
+ return minSpaceAfterSpillPartID >= 0 ? minSpaceAfterSpillPartID : nextAvailablePidToSpill;
}
- private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
- for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
- spilledStatus.nextSetBit(i + 1)) {
- int spilledTupleCount = buildPSizeInTups[i];
- // Expected hash table size increase after reloading this partition
- long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
- inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
- if (freeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
- return i;
+ /**
+ * Finds a partition that can fit in the left over memory.
+ * @param freeSpace current free space
+ * @param inMemTupCount number of tuples currently in memory
+ * @return partition id of selected partition to reload
+ */
+ private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) {
+ int frameSize = ctx.getInitialFrameSize();
+ // Add one frame to freeSpace to consider the one frame reserved for the spilled partition
+ long totalFreeSpace = freeSpace + frameSize;
+ if (totalFreeSpace > 0) {
+ for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
+ spilledStatus.nextSetBit(i + 1)) {
+ int spilledTupleCount = buildPSizeInTups[i];
+ // Expected hash table size increase after reloading this partition
+ long expectedHashTableByteSizeIncrease = SerializableHashTable
+ .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, spilledTupleCount, frameSize);
+ if (totalFreeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
+ return i;
+ }
}
}
return -1;
@@ -421,13 +384,11 @@
while (r.nextFrame(reloadBuffer)) {
accessorBuild.reset(reloadBuffer.getBuffer());
for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
- if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- continue;
+ if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+ // for some reason (e.g. fragmentation) if inserting fails, we need to clear the occupied frames
+ bufferManager.clearPartition(pid);
+ return false;
}
- // for some reason (e.g. due to fragmentation) if the inserting failed,
- // we need to clear the occupied frames
- bufferManager.clearPartition(pid);
- return false;
}
}
// Closes and deletes the run file if it is already loaded into memory.
@@ -440,21 +401,14 @@
return true;
}
- private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
- ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
- this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
- new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table,
- predEvaluator, isReversed, bufferManagerForHashTable);
- }
-
- private void loadDataInMemJoin() throws HyracksDataException {
+ private void buildHashTable() throws HyracksDataException {
for (int pid = 0; pid < numOfPartitions; pid++) {
if (!spilledStatus.get(pid)) {
bufferManager.flushPartition(pid, new IFrameWriter() {
@Override
- public void open() throws HyracksDataException {
-
+ public void open() {
+ // Only nextFrame method is needed to pass the frame to the next operator.
}
@Override
@@ -463,24 +417,21 @@
}
@Override
- public void fail() throws HyracksDataException {
-
+ public void fail() {
+ // Only nextFrame method is needed to pass the frame to the next operator.
}
@Override
- public void close() throws HyracksDataException {
-
+ public void close() {
+ // Only nextFrame method is needed to pass the frame to the next operator.
}
});
}
}
}
- public void initProbe() throws HyracksDataException {
-
+ public void initProbe() {
probePSizeInTups = new int[numOfPartitions];
- probeRFWriters = new RunFileWriter[numOfPartitions];
-
}
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
@@ -497,19 +448,7 @@
if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
if (spilledStatus.get(pid)) { //pid is Spilled
- while (!bufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
- int victim = pid;
- if (bufferManager.getNumTuples(pid) == 0) { // current pid is empty, choose the biggest one
- victim = spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
- }
- if (victim < 0) { // current tuple is too big for all the free space
- flushBigProbeObjectToDisk(pid, accessorProbe, i);
- break;
- }
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
- bufferManager.flushPartition(victim, runFileWriter);
- bufferManager.clearPartition(victim);
- }
+ processTupleProbePhase(i, pid);
} else { //pid is Resident
inMemJoiner.join(i, writer);
}
@@ -518,12 +457,44 @@
}
}
+ private void processTupleProbePhase(int tupleId, int pid) throws HyracksDataException {
+
+ if (!bufferManager.insertTuple(pid, accessorProbe, tupleId, tempPtr)) {
+ int recordSize =
+ VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId));
+ // If the partition is at least half-full and insertion fails, that partition is preferred to get
+ // spilled, otherwise the biggest partition gets chosen as the victim.
+ boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2);
+ int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid
+ : spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
+ // This method is called for the spilled partitions, so we know that this tuple is going to get written to
+ // disk, sooner or later. As such, we try to reduce the number of writes that happens. So if the record is
+ // larger than the size of the victim partition, we just flush it to the disk, otherwise we spill the
+ // victim and this time insertion should be successful.
+ //TODO:(More optimization) There might be a case where the leftover memory in the last frame of the
+ // current partition + free memory in buffer manager + memory from the victim would be sufficient to hold
+ // the record.
+ if (victim >= 0 && bufferManager.getPhysicalSize(victim) >= recordSize) {
+ RunFileWriter runFileWriter =
+ getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, victim);
+ bufferManager.flushPartition(victim, runFileWriter);
+ bufferManager.clearPartition(victim);
+ if (!bufferManager.insertTuple(pid, accessorProbe, tupleId, tempPtr)) {
+ // This should not happen if the size calculations are correct, just not to let the query fail.
+ flushBigProbeObjectToDisk(pid, accessorProbe, tupleId);
+ }
+ } else {
+ flushBigProbeObjectToDisk(pid, accessorProbe, tupleId);
+ }
+ }
+ }
+
private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
throws HyracksDataException {
if (bigProbeFrameAppender == null) {
bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+ RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid);
if (!bigProbeFrameAppender.append(accessorProbe, i)) {
throw new HyracksDataException("The given tuple is too big");
}
@@ -542,55 +513,42 @@
public void releaseResource() throws HyracksDataException {
inMemJoiner.closeTable();
- closeAllSpilledPartitions(SIDE.PROBE);
+ closeAllSpilledPartitions(probeRFWriters, probeRelName);
bufferManager.close();
inMemJoiner = null;
bufferManager = null;
bufferManagerForHashTable = null;
}
- /**
- * In case of failure happens, we need to clear up the generated temporary files.
- */
- public void clearProbeTempFiles() throws HyracksDataException {
- for (int i = 0; i < probeRFWriters.length; i++) {
- if (probeRFWriters[i] != null) {
- probeRFWriters[i].erase();
- }
- }
- }
-
public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
- return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
+ return buildRFWriters[pid] == null ? null : buildRFWriters[pid].createDeleteOnCloseReader();
}
public int getBuildPartitionSizeInTup(int pid) {
- return (buildPSizeInTups[pid]);
+ return buildPSizeInTups[pid];
}
public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
- return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
+ return probeRFWriters[pid] == null ? null : probeRFWriters[pid].createDeleteOnCloseReader();
}
public int getProbePartitionSizeInTup(int pid) {
- return (probePSizeInTups[pid]);
+ return probePSizeInTups[pid];
}
public int getMaxBuildPartitionSize() {
- int max = buildPSizeInTups[0];
- for (int i = 1; i < buildPSizeInTups.length; i++) {
- if (buildPSizeInTups[i] > max) {
- max = buildPSizeInTups[i];
- }
- }
- return max;
+ return getMaxPartitionSize(buildPSizeInTups);
}
public int getMaxProbePartitionSize() {
- int max = probePSizeInTups[0];
- for (int i = 1; i < probePSizeInTups.length; i++) {
- if (probePSizeInTups[i] > max) {
- max = probePSizeInTups[i];
+ return getMaxPartitionSize(probePSizeInTups);
+ }
+
+ private int getMaxPartitionSize(int[] partitions) {
+ int max = partitions[0];
+ for (int i = 1; i < partitions.length; i++) {
+ if (partitions[i] > max) {
+ max = partitions[i];
}
}
return max;
@@ -600,6 +558,10 @@
return spilledStatus;
}
+ public int getPartitionSize(int pid) {
+ return bufferManager.getPhysicalSize(pid);
+ }
+
public void setIsReversed(boolean b) {
this.isReversed = b;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index e6da7c9..71e5fd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -48,15 +48,16 @@
ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold) throws HyracksDataException {
super(tableSize, ctx, false);
this.bufferManager = bufferManager;
-
- ByteBuffer newFrame = getFrame(frameSize);
- if (newFrame == null) {
- throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+ if (tableSize > 0) {
+ ByteBuffer newFrame = getFrame(frameSize);
+ if (newFrame == null) {
+ throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+ }
+ IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+ frameCapacity = frame.capacity();
+ contents.add(frame);
+ currentOffsetInEachFrameList.add(0);
}
- IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
- frameCapacity = frame.capacity();
- contents.add(frame);
- currentOffsetInEachFrameList.add(0);
this.garbageCollectionThreshold = garbageCollectionThreshold;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
new file mode 100644
index 0000000..4c6b70f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoin;
+import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class OptimizedHybridHashJoinTest {
+ int frameSize = 32768;
+ int totalNumberOfFrames = 10;
+ IHyracksTaskContext ctx = TestUtils.create(frameSize);
+ OptimizedHybridHashJoin hhj;
+ static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
+ static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE };
+ static String probeRelName = "RelR";
+ static String buildRelName = "RelS";
+ static ITuplePairComparator comparator;
+ static RecordDescriptor probeRd;
+ static RecordDescriptor buildRd;
+ static ITuplePartitionComputer probeHpc;
+ static ITuplePartitionComputer buildHpc;
+ static IPredicateEvaluator predEval;
+ int memSizeInFrames = -1;
+ int numOfPartitions = -1;
+ boolean isLeftOuter = false;
+ static int[] probeKeys = { 0 };
+ static int[] buildKeys = { 0 };
+ private final Random rnd = new Random(50);
+
+ @BeforeClass
+ public static void classSetUp() {
+ comparator = Mockito.mock(ITuplePairComparator.class);
+ probeHpc = new FieldHashPartitionComputerFamily(probeKeys, propHashFunctionFactories).createPartitioner(0);
+ buildHpc = new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories).createPartitioner(0);
+ }
+
+ @Test
+ public void SmallRecords_AllRelationsInMemory() throws HyracksDataException {
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ generateIntFrame(frame);
+ probeRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ buildRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ memSizeInFrames = 55;
+ numOfPartitions = 5;
+ testJoin(memSizeInFrames, numOfPartitions, frame);
+ }
+
+ @Test
+ public void SmallRecords_SomeRelationsInMemory() throws HyracksDataException {
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ generateIntFrame(frame);
+ probeRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ buildRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ memSizeInFrames = 40;
+ numOfPartitions = 5;
+ testJoin(memSizeInFrames, numOfPartitions, frame);
+ }
+
+ @Test
+ public void SmallRecords_AllRelationsSpill() throws HyracksDataException {
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ generateIntFrame(frame);
+ probeRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ buildRd = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ memSizeInFrames = 5;
+ numOfPartitions = 5;
+ testJoin(memSizeInFrames, numOfPartitions, frame);
+ }
+
+ @Test
+ public void LargeRecords_AllRelationsInMemory() throws HyracksDataException {
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ generateStringFrame(frame, ctx.getInitialFrameSize() * 3);
+ probeRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ buildRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ memSizeInFrames = 55;
+ numOfPartitions = 5;
+ testJoin(memSizeInFrames, numOfPartitions, frame);
+ }
+
+ @Test
+ public void LargeRecords_SomeRelationsInMemory() throws HyracksDataException {
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ generateStringFrame(frame, ctx.getInitialFrameSize() * 3);
+ probeRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ buildRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ memSizeInFrames = 40;
+ numOfPartitions = 5;
+ testJoin(memSizeInFrames, numOfPartitions, frame);
+ }
+
+ @Test
+ public void LargeRecords_AllRelationsSpill() throws HyracksDataException {
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ generateStringFrame(frame, ctx.getInitialFrameSize() * 3);
+ probeRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ buildRd = new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+ memSizeInFrames = 5;
+ numOfPartitions = 5;
+ testJoin(memSizeInFrames, numOfPartitions, frame);
+ }
+
+ private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
+
+ hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator,
+ probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+
+ hhj.initBuild();
+
+ for (int i = 0; i < totalNumberOfFrames; i++) {
+ hhj.build(frame.getBuffer());
+ }
+
+ hhj.closeBuild();
+ checkOneFrameReservedPerSpilledPartitions();
+ IFrameWriter writer = new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ //Not implemented as this method is only used to pass the frames of the given partition
+ //to the in memory joiner. As such, only next frame is important.
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ //Not implemented as this method is only used to pass the frames of the given partition
+ //to the in memory joiner. As such, only next frame is important.
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ //Not implemented as this method is only used to pass the frames of the given partition
+ //to the in memory joiner. As such, only next frame is important.
+ }
+ };
+ hhj.initProbe();
+ for (int i = 0; i < totalNumberOfFrames; i++) {
+ hhj.probe(frame.getBuffer(), writer);
+ checkOneFrameReservedPerSpilledPartitions();
+ }
+
+ }
+
+ private void checkOneFrameReservedPerSpilledPartitions() throws HyracksDataException {
+ //Make sure that there is one frame reserved for each spilled partition.
+ int frameSize = ctx.getInitialFrameSize();
+ int totalSize = 0;
+ int inMemTuples = 0;
+ BitSet spilledStatus = hhj.getPartitionStatus();
+ for (int i = spilledStatus.nextClearBit(0); i >= 0 && i < numOfPartitions; i =
+ spilledStatus.nextClearBit(i + 1)) {
+ totalSize += hhj.getPartitionSize(i);
+ inMemTuples += hhj.getBuildPartitionSizeInTup(i);
+ }
+ if (memSizeInFrames * frameSize - totalSize
+ - SerializableHashTable.getExpectedTableByteSize(inMemTuples, frameSize) < spilledStatus.cardinality()
+ * frameSize) {
+ throw new HyracksDataException("There should be at least one frame reserved for each spilled partition.");
+ }
+ }
+
+ private void generateIntFrame(VSizeFrame frame) throws HyracksDataException {
+ int fieldCount = 1;
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(frame, true);
+ while (appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ TupleUtils.createIntegerTuple(tb, tuple, rnd.nextInt());
+ tuple.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+ }
+ }
+
+ private void generateStringFrame(VSizeFrame frame, int length) throws HyracksDataException {
+ int fieldCount = 1;
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(frame, true);
+ ISerializerDeserializer[] fieldSerdes = { new UTF8StringSerializerDeserializer() };
+ String data = "";
+ for (int i = 0; i < length; i++) {
+ data += "X";
+ }
+ TupleUtils.createTuple(tb, tuple, fieldSerdes, data);
+ tuple.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+ appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ }
+}