Making HHJ Code Available for review
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@665 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
index 063d0f7..d0148f3 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileReader.java
@@ -42,4 +42,8 @@
public void close() throws HyracksDataException {
ioManager.close(handle);
}
+
+ public long getFileSize() {
+ return size;
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 7f33bc4..e0035e7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -50,6 +50,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final int RPARTITION_ACTIVITY_ID = 0;
@@ -304,15 +306,18 @@
ByteBuffer buffer = ctx.allocateFrame();// input
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
RunFileWriter buildWriter = buildWriters[partitionid];
RunFileWriter probeWriter = probeWriters[partitionid];
if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
+ table.reset();
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(),
rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
// build
if (buildWriter != null) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 23064f0..66aa8fb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -51,6 +51,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
@@ -306,10 +308,11 @@
ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
.createPartitioner();
int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
state.joiner = new InMemoryHashJoin(ctx, tableSize,
new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
- comparators), isLeftOuter, nullWriters1);
+ comparators), isLeftOuter, nullWriters1, table);
bufferForPartitions = new ByteBuffer[state.nPartitions];
state.fWriters = new RunFileWriter[state.nPartitions];
for (int i = 0; i < state.nPartitions; i++) {
@@ -487,11 +490,11 @@
if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
-
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
- nullWriters1);
+ nullWriters1, table);
if (buildWriter != null) {
RunFileReader buildReader = buildWriter.createReader();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index fad12c1..3e5e30f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -29,9 +29,11 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
public class InMemoryHashJoin {
- private final Link[] table;
+
private final List<ByteBuffer> buffers;
private final FrameTupleAccessor accessorBuild;
private final ITuplePartitionComputer tpcBuild;
@@ -42,13 +44,18 @@
private final ByteBuffer outBuffer;
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuild;
-
+ private final ISerializableTable table;
+ private final int tableSize;
+ private final TuplePointer storedTuplePointer;
+
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
- FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
throws HyracksDataException {
- table = new Link[tableSize];
- buffers = new ArrayList<ByteBuffer>();
+ this.tableSize = tableSize;
+ this.table = table;
+ storedTuplePointer = new TuplePointer();
+ buffers = new ArrayList<ByteBuffer>();
this.accessorBuild = accessor1;
this.tpcBuild = tpc1;
this.accessorProbe = accessor0;
@@ -77,13 +84,10 @@
accessorBuild.reset(buffer);
int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
- int entry = tpcBuild.partition(accessorBuild, i, table.length);
- long tPointer = (((long) bIndex) << 32) + i;
- Link link = table[entry];
- if (link == null) {
- link = table[entry] = new Link();
- }
- link.add(tPointer);
+ int entry = tpcBuild.partition(accessorBuild, i, tableSize);
+ storedTuplePointer.frameIndex = bIndex;
+ storedTuplePointer.tupleIndex = i;
+ table.insert(entry, storedTuplePointer);
}
}
@@ -91,28 +95,29 @@
accessorProbe.reset(buffer);
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
- int entry = tpcProbe.partition(accessorProbe, i, table.length);
- Link link = table[entry];
+ int entry = tpcProbe.partition(accessorProbe, i, tableSize);
boolean matchFound = false;
- if (link != null) {
- for (int j = 0; j < link.size; ++j) {
- long pointer = link.pointers[j];
- int bIndex = (int) ((pointer >> 32) & 0xffffffff);
- int tIndex = (int) (pointer & 0xffffffff);
- accessorBuild.reset(buffers.get(bIndex));
- int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
- if (c == 0) {
- matchFound = true;
+ int offset = 0;
+ do {
+ table.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ int bIndex = storedTuplePointer.frameIndex;
+ int tIndex = storedTuplePointer.tupleIndex;
+ accessorBuild.reset(buffers.get(bIndex));
+ int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
+ if (c == 0) {
+ matchFound = true;
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- throw new IllegalStateException();
- }
+ throw new IllegalStateException();
}
}
}
- }
+ } while (true);
+
if (!matchFound && isLeftOuter) {
if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 4a13e16..a1fc0b8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -45,6 +45,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final int BUILD_ACTIVITY_ID = 0;
@@ -161,10 +163,11 @@
.createPartitioner();
state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
state.joiner = new InMemoryHashJoin(ctx, tableSize,
new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
- comparators), isLeftOuter, nullWriters1);
+ comparators), isLeftOuter, nullWriters1, table);
}
@Override