Merge branch 'master' into yingyi/fullstack_fix
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index 965ade7..eddc4df 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,6 +25,6 @@
public IIOManager getIOManager();
public ByteBuffer allocateFrame() throws HyracksDataException;
-
+
public void deallocateFrames(int frameCount);
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index 3ea81ef..97ce664 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -46,5 +46,4 @@
// TODO Auto-generated method stub
}
-
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 20075ff..fa9b6b3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -113,7 +113,7 @@
public ByteBuffer allocateFrame() throws HyracksDataException {
return joblet.allocateFrame();
}
-
+
@Override
public void deallocateFrames(int frameCount) {
joblet.deallocateFrames(frameCount);
@@ -241,7 +241,8 @@
sem.acquire();
final int cIdx = i;
executor.execute(new Runnable() {
- public void run() {
+ @Override
+ public void run() {
if (aborted) {
return;
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index b52a54e..33e8e034 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -32,7 +32,7 @@
return new IValueParser() {
@Override
public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
- int n = 0;
+ long n = 0;
int sign = 1;
int i = 0;
boolean pre = true;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 60e9c40..86d738f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -33,7 +33,8 @@
import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
public class InMemoryHashJoin {
-
+
+ private final IHyracksTaskContext ctx;
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
@@ -61,7 +62,8 @@
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
- this.tableSize = tableSize;
+ this.ctx = ctx;
+ this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
buffers = new ArrayList<ByteBuffer>();
@@ -146,6 +148,9 @@
if (appender.getTupleCount() > 0) {
flushFrame(outBuffer, writer);
}
+ int nFrames = buffers.size();
+ buffers.clear();
+ ctx.deallocateFrames(nFrames);
}
private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 979ef59..994b0ef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -139,16 +139,7 @@
boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
matchFound = true;
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i)
- + accessorInner.getTupleEndOffset(j) - accessorInner.getTupleStartOffset(j);
- throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
+ appendToResults(i, j, writer);
}
}
@@ -177,7 +168,35 @@
return ( (predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2) );
}
}
-
+
+ private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException{
+ if(!isReversed){
+ if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
+ int tSize = accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId)
+ + accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId);
+ throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
+ }
+ }
+ }
+ else{ //Role Reversal Optimization is triggered
+ if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
+ int tSize = accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId)+
+ accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId);
+ throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
+ }
+ }
+ }
+
+ }
+
public void closeCache() throws HyracksDataException {
if (runFileWriter != null) {
runFileWriter.close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 4e9376d..276f60f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -120,6 +119,12 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
+ //Flags added for test purpose
+ private static boolean skipInMemoryHJ = false;
+ private static boolean forceNLJ = false;
+ private static boolean forceRR = false;
+
+
private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
@@ -305,7 +310,7 @@
public void close() throws HyracksDataException {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
- LOGGER.log(Level.FINE, "OptimizedHybridHashJoin closed its build phase");
+ LOGGER.fine("OptimizedHybridHashJoin closed its build phase");
}
@Override
@@ -414,63 +419,60 @@
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
int beforeMax = (bSize > pSize) ? bSize : pSize;
- joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1);
-
+ joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
}
writer.close();
}
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
- RunFileReader probeSideReader, int pid, int beforeMax, int level) throws HyracksDataException {
+ RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed) throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
- long buildPartSize = ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize();
- long probePartSize = ohhj.getProbePartitionSize(pid) / ctx.getFrameSize();
+ long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize());
+ long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize());
- LOGGER.log(Level.FINE,"Joining Partition Pairs (pid "+pid+") - (level "+level+") - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin));
-
+ LOGGER.fine("\n>>>Joining Partition Pairs (pid "+pid+") - (level "+level+") - wasReversed "+wasReversed+" - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin)+" - LeftOuter is "+isLeftOuter);
+
//Apply in-Mem HJ if possible
- if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
- int tabSize = -1;
-
- if (isLeftOuter || buildPartSize < probePartSize) {
- tabSize = ohhj.getBuildPartitionSizeInTup(pid);
-
- if (tabSize == 0) {
- throw new HyracksDataException(
- "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ if(!skipInMemoryHJ){
+ if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin && !isLeftOuter)) {
+ int tabSize = -1;
+ if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize)) ) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+ LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "+level+"]");
+ tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj.getBuildPartitionSizeInTup(pid);
+ if (tabSize == 0) {
+ throw new HyracksDataException(
+ "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Build Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+ buildSideReader, probeSideReader, false, pid); //checked-confirmed
+ } else { //Case 1.2 - InMemHJ with Role Reversal
+ LOGGER.fine("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "+level+"]");
+ tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj.getProbePartitionSizeInTup(pid);
+ if (tabSize == 0) {
+ throw new HyracksDataException(
+ "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
+ }
+ //Probe Side is smaller
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+ probeSideReader, buildSideReader, true, pid); //checked-confirmed
}
- //Build Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
- buildSideReader, probeSideReader, false, pid);
-
- }
-
- else { //Role Reversal
- tabSize = ohhj.getProbePartitionSizeInTup(pid);
- if (tabSize == 0) {
- throw new HyracksDataException(
- "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
- }
- //Probe Side is smaller
-
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
- probeSideReader, buildSideReader, true, pid);
}
}
//Apply (Recursive) HHJ
else {
+ LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level "+level+"]");
OptimizedHybridHashJoin rHHj;
- if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
- LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") [buildSize is smaller]");
+ if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+ LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
-
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
- probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
+ probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator); //checked-confirmed
buildSideReader.open();
rHHj.initBuild();
@@ -495,7 +497,8 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+ LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -504,12 +507,12 @@
continue;
}
- joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1));
+ joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
}
- } else { //Switch to NLJ (Further recursion seems not to be useful)
- LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse false) [coming from buildSize was smaller]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ } else { //Case 2.1.2 - Switch to NLJ
+ LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -520,21 +523,21 @@
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (isLeftOuter || buildSideInTups < probeSideInTups) {
- applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
- nljComparator0, false);
- } else {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rprfw, rbrfw,
- nljComparator1, false);
+ nljComparator0, false); //checked-modified
+ } else {
+ applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
+ nljComparator1, true); //checked-modified
}
}
}
- } else { //Role Reversal (Probe Side is smaller)
- LOGGER.log(Level.FINE,"\tApply RecursiveHHJ for (pid "+pid+") - (level "+level+") WITH REVERSAL [probeSize is smaller]");
+ } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
+ LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
- buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
+ buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator); //checked-confirmed
rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
probeSideReader.open();
@@ -557,7 +560,8 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (afterMax < NLJ_SWITCH_THRESHOLD * beforeMax) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+ LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -566,10 +570,10 @@
continue;
}
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1));
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
}
- } else { //Switch to NLJ (Further recursion seems not to be effective)
- LOGGER.log(Level.FINE,"\tSwitched to NLJ for (pid "+pid+") - (level "+level+") (reverse true) [coming from probeSize was smaller]");
+ } else { //Case 2.2.2 - Switch to NLJ
+ LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -582,10 +586,10 @@
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
applyNestedLoopJoin(buildRd, probeRd, state.memForJoin, rbrfw, rprfw,
- nljComparator1, true);
+ nljComparator0, true); //checked-modified
} else {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rprfw, rbrfw,
- nljComparator0, true);
+ nljComparator1, true); //checked-modified
}
}
}
@@ -599,7 +603,6 @@
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
- LOGGER.log(Level.FINE,"\t(pid "+pid+") - applyInMemHashJoin (reversal "+reverse+")");
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -617,7 +620,7 @@
}
bReader.close();
rPartbuff.clear();
- // probe
+ // probe
pReader.open();
while (pReader.nextFrame(rPartbuff)) {
joiner.join(rPartbuff, writer);
@@ -630,10 +633,11 @@
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
throws HyracksDataException {
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter, nullWriters1);
+ nlj.setIsReversed(reverse);
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
-
+
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
@@ -660,4 +664,16 @@
return op;
}
}
-}
+
+ public void setSkipInMemHJ(boolean b){
+ skipInMemoryHJ = b;
+ }
+
+ public void setForceNLJ(boolean b){
+ forceNLJ = b;
+ }
+
+ public void setForceRR(boolean b){
+ forceRR = (!isLeftOuter && b);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
index a245c9d..1ea8393 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -136,12 +136,14 @@
@Override
public void close() {
- for (int i = 0; i < headers.length; i++)
+ int nFrames = contents.size();
+ for (int i = 0; i < headers.length; i++)
headers[i] = null;
contents.clear();
frameCurrentIndex.clear();
tupleCount = 0;
currentLargestFrameIndex = 0;
+ ctx.deallocateFrames(nFrames);
}
private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index f18099b..2e4c812 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -116,4 +116,4 @@
// TODO Auto-generated method stub
}
-}
\ No newline at end of file
+}