Fixed dropped tuples bug in HHJ
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@179 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 4381bd1..a02156d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -299,7 +299,6 @@
for (int i = 0; i < B; i++) {
try {
files[i] = ctx.getResourceManager().createFile(relationName, null);
- files[i].deleteOnExit();
bufferForPartitions[i] = ctx.getResourceManager().allocateFrame();
} catch (IOException e) {
throw new HyracksDataException(e);
@@ -374,7 +373,6 @@
for (int i = 0; i < B; i++) {
try {
filesS[i] = ctx.getResourceManager().createFile(largeRelation, null);
- filesS[i].deleteOnExit();
bufferForPartitions[i] = ctx.getResourceManager().allocateFrame();
} catch (IOException e) {
throw new HyracksDataException(e);
@@ -442,7 +440,6 @@
if (wChannel == null) {
wChannel = new RandomAccessFile(filesS[entry], "rw").getChannel();
channelsS[entry] = wChannel;
- wChannel = channelsS[entry];
}
wChannel.write(outbuf);
numWriteI2++;
@@ -468,17 +465,18 @@
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(B, hpcf0).createPartitioner();
ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(B, hpcf1).createPartitioner();
if (memoryForHashtable != memsize - 2) {
- int[] memRi = new int[B];
for (int i = 0; i < B; i++) {
try {
- FileChannel wChannel = channelsS[i];
- if (wChannel != null) {
- ByteBuffer outbuf = bufferForPartitions[i];
- accessor1.reset(outbuf);
- if (accessor1.getTupleCount() > 0) {
- wChannel.write(outbuf);
- numWriteI2++;
+ ByteBuffer buf = bufferForPartitions[i];
+ accessor1.reset(buf);
+ if (accessor1.getTupleCount() > 0) {
+ FileChannel wChannel = channelsS[i];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(filesS[i], "rw").getChannel();
+ channelsS[i] = wChannel;
}
+ wChannel.write(buf);
+ numWriteI2++;
}
} catch (IOException e) {
throw new HyracksDataException(e);
@@ -510,7 +508,6 @@
FrameUtils.copy(inBuffer, copyBuffer);
joiner.build(copyBuffer);
inBuffer.clear();
- memRi[partitionid]++;
state = inChannel.read(inBuffer);
}
appender.reset(outBuffer, false);
@@ -541,7 +538,6 @@
env.set(JOINER0, null);
env.set(MEM_HASHTABLE, null);
env.set(NUM_PARTITION, null);
-
}
@Override