Fix for Role Reversal Optimization in Optimized HHJ

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2933 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 3e5e30f..d86f1d5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -17,7 +17,6 @@
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -47,11 +46,18 @@
     private final ISerializableTable table;
 	private final int tableSize;
     private final TuplePointer storedTuplePointer;
+    private final boolean reverseOutputOrder;	//Should we reverse the order of tuples, we are writing in output
     
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
             throws HyracksDataException {
+    	this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+    }
+    
+    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+            ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table, boolean reverse) throws HyracksDataException {
     	this.tableSize = tableSize;
        	this.table = table;
        	storedTuplePointer = new TuplePointer();
@@ -76,6 +82,7 @@
         } else {
             nullTupleBuild = null;
         }
+    	reverseOutputOrder = reverse;
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,18 +115,13 @@
                 int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                 if (c == 0) {
                     matchFound = true;
-                    if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                        flushFrame(outBuffer, writer);
-                        appender.reset(outBuffer, true);
-                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                            throw new IllegalStateException();
-                        }
-                    }
+                    appendToResult(i, tIndex, writer);
                 }
             } while (true);
 
             if (!matchFound && isLeftOuter) {
-                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+                
+            	if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
                     flushFrame(outBuffer, writer);
                     appender.reset(outBuffer, true);
@@ -128,6 +130,7 @@
                         throw new IllegalStateException();
                     }
                 }
+                
             }
         }
     }
@@ -145,23 +148,25 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
-
-    private static class Link {
-        private static final int INIT_POINTERS_SIZE = 8;
-
-        long[] pointers;
-        int size;
-
-        Link() {
-            pointers = new long[INIT_POINTERS_SIZE];
-            size = 0;
-        }
-
-        void add(long pointer) {
-            if (size >= pointers.length) {
-                pointers = Arrays.copyOf(pointers, pointers.length * 2);
+    
+    private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException{
+    	if(!reverseOutputOrder){
+    		if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+                    throw new IllegalStateException();
+                }
             }
-            pointers[size++] = pointer;
-        }
+    	}
+    	else{
+    		if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+                    throw new IllegalStateException();
+                }
+            }
+    	}
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 19479df..cf39416 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -135,6 +136,7 @@
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
+        
 
     }
 
@@ -254,6 +256,7 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
             
+            
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
                         .getJobId(), new TaskId(getActivityId(), partition));
@@ -421,34 +424,40 @@
                     //Apply in-Mem HJ if possible
                     if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
                         int tabSize = -1;
-                        if (buildPartSize < probePartSize) {
+                        
+                        if (isLeftOuter || buildPartSize < probePartSize) {
                             tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+                           
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
-                            //Build Side is smaller
-                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
-                                    buildSideReader, probeSideReader);
+                          //Build Side is smaller
+                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+                                    buildSideReader, probeSideReader, false, pid);
 
-                        } else { //Role Reversal
+                        } 
+                        
+                        else { //Role Reversal
                             tabSize = ohhj.getProbePartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
                             //Probe Side is smaller
-                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
-                                    probeSideReader, buildSideReader);
+                            
+                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+                                    probeSideReader, buildSideReader, true, pid);
                         }
                     }
                     //Apply (Recursive) HHJ
                     else {
                         OptimizedHybridHashJoin rHHj;
-                        if (buildPartSize < probePartSize) { //Build Side is smaller
+                        if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
 
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
                                     nPartitions);
+                           
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
                                     probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
 
@@ -491,14 +500,14 @@
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+                                    
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
 
                                     int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                                     int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-                                    if (buildSideInTups < probeSideInTups) {
+                                    if (isLeftOuter || buildSideInTups < probeSideInTups) {
                                         applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
                                                 nljComparator0);
                                     } else {
@@ -510,6 +519,7 @@
                         } else { //Role Reversal (Probe Side is smaller)
                             int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
                                     nPartitions);
+                            
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
                                     buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
 
@@ -548,7 +558,7 @@
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+                                    
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
@@ -572,14 +582,14 @@
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
-                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
                         throws HyracksDataException {
 
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
                             buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
-                            isLeftOuter, nullWriters1, table);
+                            isLeftOuter, nullWriters1, table, reverse);
 
                     bReader.open();
                     rPartbuff.clear();