1. after one join, need to copy the frame
2. extract the memory reload as method
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@415 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 28251a5..928e9df 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -16,127 +16,144 @@
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
public class NestedLoopJoin {
- private final FrameTupleAccessor accessorInner;
- private final FrameTupleAccessor accessorOuter;
- private final FrameTupleAppender appender;
- private final ITuplePairComparator tpComparator;
- private final ByteBuffer outBuffer;
- private final ByteBuffer innerBuffer;
- private final List<ByteBuffer> outBuffers;
- private final int memSize;
- private final IHyracksStageletContext ctx;
- private RunFileReader runFileReader;
- private int currentMemSize = 0;
- private final RunFileWriter runFileWriter;
+ private final FrameTupleAccessor accessorInner;
+ private final FrameTupleAccessor accessorOuter;
+ private final FrameTupleAppender appender;
+ private final ITuplePairComparator tpComparator;
+ private final ByteBuffer outBuffer;
+ private final ByteBuffer innerBuffer;
+ private final List<ByteBuffer> outBuffers;
+ private final int memSize;
+ private final IHyracksStageletContext ctx;
+ private RunFileReader runFileReader;
+ private int currentMemSize = 0;
+ private final RunFileWriter runFileWriter;
- public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
- ITuplePairComparator comparators, int memSize) throws HyracksDataException {
- this.accessorInner = accessor1;
- this.accessorOuter = accessor0;
- this.appender = new FrameTupleAppender(ctx.getFrameSize());
- this.tpComparator = comparators;
- this.outBuffer = ctx.allocateFrame();
- this.innerBuffer = ctx.allocateFrame();
- this.appender.reset(outBuffer, true);
- this.outBuffers = new ArrayList<ByteBuffer>();
- this.memSize = memSize;
- this.ctx = ctx;
+ public NestedLoopJoin(IHyracksStageletContext ctx,
+ FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+ ITuplePairComparator comparators, int memSize)
+ throws HyracksDataException {
+ this.accessorInner = accessor1;
+ this.accessorOuter = accessor0;
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.tpComparator = comparators;
+ this.outBuffer = ctx.allocateFrame();
+ this.innerBuffer = ctx.allocateFrame();
+ this.appender.reset(outBuffer, true);
+ this.outBuffers = new ArrayList<ByteBuffer>();
+ this.memSize = memSize;
+ this.ctx = ctx;
- FileReference file = ctx.getJobletContext().createWorkspaceFile(
- this.getClass().getSimpleName() + this.toString());
- runFileWriter = new RunFileWriter(file, ctx.getIOManager());
- runFileWriter.open();
- }
+ FileReference file = ctx.getJobletContext().createWorkspaceFile(
+ this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ runFileWriter.open();
+ }
- public void cache(ByteBuffer buffer) throws HyracksDataException {
- runFileWriter.nextFrame(buffer);
- System.out.println(runFileWriter.getFileSize());
- }
+ public void cache(ByteBuffer buffer) throws HyracksDataException {
+ runFileWriter.nextFrame(buffer);
+ System.out.println(runFileWriter.getFileSize());
+ }
- public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
- if (outBuffers.size() < memSize - 3) {
- ByteBuffer outerBufferCopy = ctx.allocateFrame();
- FrameUtils.copy(outerBuffer, outerBufferCopy);
- outBuffers.add(outerBufferCopy);
- currentMemSize++;
- return;
- }
- if (currentMemSize < memSize - 3) {
- FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
- currentMemSize++;
- return;
- }
- for (ByteBuffer outBuffer : outBuffers) {
- runFileReader = runFileWriter.createReader();
- runFileReader.open();
- while (runFileReader.nextFrame(innerBuffer)) {
- blockJoin(outBuffer, innerBuffer, writer);
- }
- runFileReader.close();
- }
- currentMemSize = 0;
- }
+ public void join(ByteBuffer outerBuffer, IFrameWriter writer)
+ throws HyracksDataException {
+ if (outBuffers.size() < memSize - 3) {
+ createAndCopyFrame(outerBuffer);
+ return;
+ }
+ if (currentMemSize < memSize - 3) {
+ reloadFrame(outerBuffer);
+ return;
+ }
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ currentMemSize = 0;
+ reloadFrame(outerBuffer);
+ }
- private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
- throws HyracksDataException {
- accessorOuter.reset(outerBuffer);
- accessorInner.reset(innerBuffer);
- int tupleCount0 = accessorOuter.getTupleCount();
- int tupleCount1 = accessorInner.getTupleCount();
+ private void createAndCopyFrame(ByteBuffer outerBuffer) {
+ ByteBuffer outerBufferCopy = ctx.allocateFrame();
+ FrameUtils.copy(outerBuffer, outerBufferCopy);
+ outBuffers.add(outerBufferCopy);
+ currentMemSize++;
+ }
- for (int i = 0; i < tupleCount0; ++i) {
- for (int j = 0; j < tupleCount1; ++j) {
- int c = compare(accessorOuter, i, accessorInner, j);
- if (c == 0) {
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
- throw new IllegalStateException();
- }
- }
- }
- }
- }
- }
+ private void reloadFrame(ByteBuffer outerBuffer) {
+ outBuffers.get(currentMemSize).clear();
+ FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+ currentMemSize++;
+ }
- public void closeCache() throws HyracksDataException {
- if (runFileWriter != null) {
- runFileWriter.close();
- }
- }
+ private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer,
+ IFrameWriter writer) throws HyracksDataException {
+ accessorOuter.reset(outerBuffer);
+ accessorInner.reset(innerBuffer);
+ int tupleCount0 = accessorOuter.getTupleCount();
+ int tupleCount1 = accessorInner.getTupleCount();
- public void closeJoin(IFrameWriter writer) throws HyracksDataException {
- for (ByteBuffer outBuffer : outBuffers) {
- runFileReader = runFileWriter.createReader();
- runFileReader.open();
- while (runFileReader.nextFrame(innerBuffer)) {
- blockJoin(outBuffer, innerBuffer, writer);
- }
- runFileReader.close();
- }
- outBuffers.clear();
- currentMemSize = 0;
+ for (int i = 0; i < tupleCount0; ++i) {
+ for (int j = 0; j < tupleCount1; ++j) {
+ int c = compare(accessorOuter, i, accessorInner, j);
+ if (c == 0) {
+ if (!appender.appendConcat(accessorOuter, i, accessorInner,
+ j)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, i,
+ accessorInner, j)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+ }
+ }
- if (appender.getTupleCount() > 0) {
- flushFrame(outBuffer, writer);
- }
- }
+ public void closeCache() throws HyracksDataException {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
- private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- buffer.position(0);
- buffer.limit(buffer.capacity());
- writer.nextFrame(buffer);
- buffer.position(0);
- buffer.limit(buffer.capacity());
- }
+ public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ outBuffers.clear();
+ currentMemSize = 0;
- private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
- throws HyracksDataException {
- int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
+ if (appender.getTupleCount() > 0) {
+ flushFrame(outBuffer, writer);
+ }
+ }
+
+ private void flushFrame(ByteBuffer buffer, IFrameWriter writer)
+ throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ writer.nextFrame(buffer);
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ }
+
+ private int compare(FrameTupleAccessor accessor0, int tIndex0,
+ FrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
}