Merge branch 'master' into westmann/nljoin
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 994b0ef..52f1198 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -100,14 +100,14 @@
             reloadFrame(outerBuffer);
             return;
         }
-        for (ByteBuffer outBuffer : outBuffers) {
-            runFileReader = runFileWriter.createReader();
-            runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
+        runFileReader = runFileWriter.createReader();
+        runFileReader.open();
+        while (runFileReader.nextFrame(innerBuffer)) {
+            for (ByteBuffer outBuffer : outBuffers) {
                 blockJoin(outBuffer, innerBuffer, writer);
             }
-            runFileReader.close();
         }
+        runFileReader.close();
         currentMemSize = 0;
         reloadFrame(outerBuffer);
     }
@@ -144,14 +144,14 @@
             }
 
             if (!matchFound && isLeftOuter) {
-                if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
-                        nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+                final int[] ntFieldEndOffsets = nullTupleBuilder.getFieldEndOffsets();
+                final byte[] ntByteArray = nullTupleBuilder.getByteArray();
+                final int ntSize = nullTupleBuilder.getSize();
+                if (!appender.appendConcat(accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, ntSize)) {
                     flushFrame(outBuffer, writer);
                     appender.reset(outBuffer, true);
-                    if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
-                            nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
-                        int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i)
-                                + nullTupleBuilder.getSize();
+                    if (!appender.appendConcat(accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, ntSize)) {
+                        int tSize = accessorOuter.getTupleEndOffset(i) - accessorOuter.getTupleStartOffset(i) + ntSize;
                         throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
                                 + appender.getBuffer().capacity() + ")");
                     }
@@ -169,32 +169,27 @@
     	}
     }
     
-    private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException{
-    	if(!isReversed){
-    		if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
-                flushFrame(outBuffer, writer);
-                appender.reset(outBuffer, true);
-                if (!appender.appendConcat(accessorOuter, outerTupleId, accessorInner, innerTupleId)) {
-                    int tSize = accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId)
-                            + accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId);
-                    throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
+    private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException {
+        if (!isReversed) {
+            appendResultToFrame(accessorOuter, outerTupleId, accessorInner, innerTupleId, writer);
+        } else {
+            //Role Reversal Optimization is triggered
+            appendResultToFrame(accessorInner, innerTupleId, accessorOuter, outerTupleId, writer);
+        }
+    }
+
+    private void appendResultToFrame(FrameTupleAccessor accessor1, int tupleId1, FrameTupleAccessor accessor2,
+            int tupleId2, IFrameWriter writer) throws HyracksDataException {
+        if (!appender.appendConcat(accessor1, tupleId1, accessor2, tupleId2)) {
+            flushFrame(outBuffer, writer);
+            appender.reset(outBuffer, true);
+            if (!appender.appendConcat(accessor1, tupleId1, accessor2, tupleId2)) {
+                int tSize = accessor1.getTupleEndOffset(tupleId1) - accessor1.getTupleStartOffset(tupleId1)
+                        + accessor2.getTupleEndOffset(tupleId2) - accessor2.getTupleStartOffset(tupleId2);
+                throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
+                        + appender.getBuffer().capacity() + ")");
             }
-    	}
-    	else{ //Role Reversal Optimization is triggered
-    		if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
-                flushFrame(outBuffer, writer);
-                appender.reset(outBuffer, true);
-                if (!appender.appendConcat(accessorInner, innerTupleId, accessorOuter, outerTupleId)) {
-                    int tSize = accessorInner.getTupleEndOffset(innerTupleId) - accessorInner.getTupleStartOffset(innerTupleId)+
-                    		accessorOuter.getTupleEndOffset(outerTupleId) - accessorOuter.getTupleStartOffset(outerTupleId);
-                    throw new HyracksDataException("Record size (" + tSize + ") larger than frame size ("
-                            + appender.getBuffer().capacity() + ")");
-                }
-            }
-    	}
-    	
+        }
     }
     
     public void closeCache() throws HyracksDataException {
@@ -204,15 +199,14 @@
     }
 
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        for (int i = 0; i < currentMemSize; i++) {
-            ByteBuffer outBuffer = outBuffers.get(i);
-            runFileReader = runFileWriter.createReader();
-            runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
-                blockJoin(outBuffer, innerBuffer, writer);
+        runFileReader = runFileWriter.createReader();
+        runFileReader.open();
+        while (runFileReader.nextFrame(innerBuffer)) {
+            for (int i = 0; i < currentMemSize; i++) {
+                blockJoin(outBuffers.get(i), innerBuffer, writer);
             }
-            runFileReader.close();
         }
+        runFileReader.close();
         outBuffers.clear();
         currentMemSize = 0;