1. after one join, need to copy the frame
2. extract the memory reload as method

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@415 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 28251a5..928e9df 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -16,127 +16,144 @@
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 
 public class NestedLoopJoin {
-    private final FrameTupleAccessor accessorInner;
-    private final FrameTupleAccessor accessorOuter;
-    private final FrameTupleAppender appender;
-    private final ITuplePairComparator tpComparator;
-    private final ByteBuffer outBuffer;
-    private final ByteBuffer innerBuffer;
-    private final List<ByteBuffer> outBuffers;
-    private final int memSize;
-    private final IHyracksStageletContext ctx;
-    private RunFileReader runFileReader;
-    private int currentMemSize = 0;
-    private final RunFileWriter runFileWriter;
+	private final FrameTupleAccessor accessorInner;
+	private final FrameTupleAccessor accessorOuter;
+	private final FrameTupleAppender appender;
+	private final ITuplePairComparator tpComparator;
+	private final ByteBuffer outBuffer;
+	private final ByteBuffer innerBuffer;
+	private final List<ByteBuffer> outBuffers;
+	private final int memSize;
+	private final IHyracksStageletContext ctx;
+	private RunFileReader runFileReader;
+	private int currentMemSize = 0;
+	private final RunFileWriter runFileWriter;
 
-    public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
-            ITuplePairComparator comparators, int memSize) throws HyracksDataException {
-        this.accessorInner = accessor1;
-        this.accessorOuter = accessor0;
-        this.appender = new FrameTupleAppender(ctx.getFrameSize());
-        this.tpComparator = comparators;
-        this.outBuffer = ctx.allocateFrame();
-        this.innerBuffer = ctx.allocateFrame();
-        this.appender.reset(outBuffer, true);
-        this.outBuffers = new ArrayList<ByteBuffer>();
-        this.memSize = memSize;
-        this.ctx = ctx;
+	public NestedLoopJoin(IHyracksStageletContext ctx,
+			FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+			ITuplePairComparator comparators, int memSize)
+			throws HyracksDataException {
+		this.accessorInner = accessor1;
+		this.accessorOuter = accessor0;
+		this.appender = new FrameTupleAppender(ctx.getFrameSize());
+		this.tpComparator = comparators;
+		this.outBuffer = ctx.allocateFrame();
+		this.innerBuffer = ctx.allocateFrame();
+		this.appender.reset(outBuffer, true);
+		this.outBuffers = new ArrayList<ByteBuffer>();
+		this.memSize = memSize;
+		this.ctx = ctx;
 
-        FileReference file = ctx.getJobletContext().createWorkspaceFile(
-                this.getClass().getSimpleName() + this.toString());
-        runFileWriter = new RunFileWriter(file, ctx.getIOManager());
-        runFileWriter.open();
-    }
+		FileReference file = ctx.getJobletContext().createWorkspaceFile(
+				this.getClass().getSimpleName() + this.toString());
+		runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+		runFileWriter.open();
+	}
 
-    public void cache(ByteBuffer buffer) throws HyracksDataException {
-        runFileWriter.nextFrame(buffer);
-        System.out.println(runFileWriter.getFileSize());
-    }
+	public void cache(ByteBuffer buffer) throws HyracksDataException {
+		runFileWriter.nextFrame(buffer);
+		System.out.println(runFileWriter.getFileSize());
+	}
 
-    public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
-        if (outBuffers.size() < memSize - 3) {
-            ByteBuffer outerBufferCopy = ctx.allocateFrame();
-            FrameUtils.copy(outerBuffer, outerBufferCopy);
-            outBuffers.add(outerBufferCopy);
-            currentMemSize++;
-            return;
-        }
-        if (currentMemSize < memSize - 3) {
-            FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
-            currentMemSize++;
-            return;
-        }
-        for (ByteBuffer outBuffer : outBuffers) {
-            runFileReader = runFileWriter.createReader();
-            runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
-                blockJoin(outBuffer, innerBuffer, writer);
-            }
-            runFileReader.close();
-        }
-        currentMemSize = 0;
-    }
+	public void join(ByteBuffer outerBuffer, IFrameWriter writer)
+			throws HyracksDataException {
+		if (outBuffers.size() < memSize - 3) {
+			createAndCopyFrame(outerBuffer);
+			return;
+		}
+		if (currentMemSize < memSize - 3) {
+			reloadFrame(outerBuffer);
+			return;
+		}
+		for (ByteBuffer outBuffer : outBuffers) {
+			runFileReader = runFileWriter.createReader();
+			runFileReader.open();
+			while (runFileReader.nextFrame(innerBuffer)) {
+				blockJoin(outBuffer, innerBuffer, writer);
+			}
+			runFileReader.close();
+		}
+		currentMemSize = 0;
+		reloadFrame(outerBuffer);
+	}
 
-    private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
-            throws HyracksDataException {
-        accessorOuter.reset(outerBuffer);
-        accessorInner.reset(innerBuffer);
-        int tupleCount0 = accessorOuter.getTupleCount();
-        int tupleCount1 = accessorInner.getTupleCount();
+	private void createAndCopyFrame(ByteBuffer outerBuffer) {
+		ByteBuffer outerBufferCopy = ctx.allocateFrame();
+		FrameUtils.copy(outerBuffer, outerBufferCopy);
+		outBuffers.add(outerBufferCopy);
+		currentMemSize++;
+	}
 
-        for (int i = 0; i < tupleCount0; ++i) {
-            for (int j = 0; j < tupleCount1; ++j) {
-                int c = compare(accessorOuter, i, accessorInner, j);
-                if (c == 0) {
-                    if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
-                        flushFrame(outBuffer, writer);
-                        appender.reset(outBuffer, true);
-                        if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
-                            throw new IllegalStateException();
-                        }
-                    }
-                }
-            }
-        }
-    }
+	private void reloadFrame(ByteBuffer outerBuffer) {
+		outBuffers.get(currentMemSize).clear();
+		FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+		currentMemSize++;
+	}
 
-    public void closeCache() throws HyracksDataException {
-        if (runFileWriter != null) {
-            runFileWriter.close();
-        }
-    }
+	private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer,
+			IFrameWriter writer) throws HyracksDataException {
+		accessorOuter.reset(outerBuffer);
+		accessorInner.reset(innerBuffer);
+		int tupleCount0 = accessorOuter.getTupleCount();
+		int tupleCount1 = accessorInner.getTupleCount();
 
-    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        for (ByteBuffer outBuffer : outBuffers) {
-            runFileReader = runFileWriter.createReader();
-            runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
-                blockJoin(outBuffer, innerBuffer, writer);
-            }
-            runFileReader.close();
-        }
-        outBuffers.clear();
-        currentMemSize = 0;
+		for (int i = 0; i < tupleCount0; ++i) {
+			for (int j = 0; j < tupleCount1; ++j) {
+				int c = compare(accessorOuter, i, accessorInner, j);
+				if (c == 0) {
+					if (!appender.appendConcat(accessorOuter, i, accessorInner,
+							j)) {
+						flushFrame(outBuffer, writer);
+						appender.reset(outBuffer, true);
+						if (!appender.appendConcat(accessorOuter, i,
+								accessorInner, j)) {
+							throw new IllegalStateException();
+						}
+					}
+				}
+			}
+		}
+	}
 
-        if (appender.getTupleCount() > 0) {
-            flushFrame(outBuffer, writer);
-        }
-    }
+	public void closeCache() throws HyracksDataException {
+		if (runFileWriter != null) {
+			runFileWriter.close();
+		}
+	}
 
-    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-        writer.nextFrame(buffer);
-        buffer.position(0);
-        buffer.limit(buffer.capacity());
-    }
+	public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+		for (ByteBuffer outBuffer : outBuffers) {
+			runFileReader = runFileWriter.createReader();
+			runFileReader.open();
+			while (runFileReader.nextFrame(innerBuffer)) {
+				blockJoin(outBuffer, innerBuffer, writer);
+			}
+			runFileReader.close();
+		}
+		outBuffers.clear();
+		currentMemSize = 0;
 
-    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
-            throws HyracksDataException {
-        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
-        if (c != 0) {
-            return c;
-        }
-        return 0;
-    }
+		if (appender.getTupleCount() > 0) {
+			flushFrame(outBuffer, writer);
+		}
+	}
+
+	private void flushFrame(ByteBuffer buffer, IFrameWriter writer)
+			throws HyracksDataException {
+		buffer.position(0);
+		buffer.limit(buffer.capacity());
+		writer.nextFrame(buffer);
+		buffer.position(0);
+		buffer.limit(buffer.capacity());
+	}
+
+	private int compare(FrameTupleAccessor accessor0, int tIndex0,
+			FrameTupleAccessor accessor1, int tIndex1)
+			throws HyracksDataException {
+		int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+		if (c != 0) {
+			return c;
+		}
+		return 0;
+	}
 }