cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-dataflow-std/pom.xml b/fullstack/hyracks/hyracks-dataflow-std/pom.xml
index bf27d20..2cf0fdc 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-std</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hyracks-dataflow-std</name>
 
   <parent>
@@ -18,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f2b56fa..f86d9fb 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -256,8 +256,6 @@
 
                 outputAppender.reset(outputFrame, true);
 
-                writer.open();
-
                 if (tPointers == null) {
                     // Not sorted
                     for (int i = 0; i < tableSize; ++i) {
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 3e5e30f..d86f1d5 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -17,7 +17,6 @@
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -47,11 +46,18 @@
     private final ISerializableTable table;
 	private final int tableSize;
     private final TuplePointer storedTuplePointer;
+    private final boolean reverseOutputOrder;	//Should we reverse the order of tuples, we are writing in output
     
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
             throws HyracksDataException {
+    	this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+    }
+    
+    public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+            ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table, boolean reverse) throws HyracksDataException {
     	this.tableSize = tableSize;
        	this.table = table;
        	storedTuplePointer = new TuplePointer();
@@ -76,6 +82,7 @@
         } else {
             nullTupleBuild = null;
         }
+    	reverseOutputOrder = reverse;
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,18 +115,13 @@
                 int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                 if (c == 0) {
                     matchFound = true;
-                    if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                        flushFrame(outBuffer, writer);
-                        appender.reset(outBuffer, true);
-                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
-                            throw new IllegalStateException();
-                        }
-                    }
+                    appendToResult(i, tIndex, writer);
                 }
             } while (true);
 
             if (!matchFound && isLeftOuter) {
-                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+                
+            	if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
                     flushFrame(outBuffer, writer);
                     appender.reset(outBuffer, true);
@@ -128,6 +130,7 @@
                         throw new IllegalStateException();
                     }
                 }
+                
             }
         }
     }
@@ -145,23 +148,25 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
-
-    private static class Link {
-        private static final int INIT_POINTERS_SIZE = 8;
-
-        long[] pointers;
-        int size;
-
-        Link() {
-            pointers = new long[INIT_POINTERS_SIZE];
-            size = 0;
-        }
-
-        void add(long pointer) {
-            if (size >= pointers.length) {
-                pointers = Arrays.copyOf(pointers, pointers.length * 2);
+    
+    private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException{
+    	if(!reverseOutputOrder){
+    		if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+                    throw new IllegalStateException();
+                }
             }
-            pointers[size++] = pointer;
-        }
+    	}
+    	else{
+    		if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+                flushFrame(outBuffer, writer);
+                appender.reset(outBuffer, true);
+                if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+                    throw new IllegalStateException();
+                }
+            }
+    	}
     }
 }
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 7e84229..6870e71 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -14,15 +14,18 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -42,9 +45,12 @@
     private RunFileReader runFileReader;
     private int currentMemSize = 0;
     private final RunFileWriter runFileWriter;
+    private final boolean isLeftOuter;
+    private final ArrayTupleBuilder nullTupleBuilder;
 
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
-            ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+            ITuplePairComparator comparators, int memSize, boolean isLeftOuter, INullWriter[] nullWriters1)
+            throws HyracksDataException {
         this.accessorInner = accessor1;
         this.accessorOuter = accessor0;
         this.appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -56,6 +62,19 @@
         this.memSize = memSize;
         this.ctx = ctx;
 
+        this.isLeftOuter = isLeftOuter;
+        if (isLeftOuter) {
+            int innerFieldCount = accessorInner.getFieldCount();
+            nullTupleBuilder = new ArrayTupleBuilder(innerFieldCount);
+            DataOutput out = nullTupleBuilder.getDataOutput();
+            for (int i = 0; i < innerFieldCount; i++) {
+                nullWriters1[i].writeNull(out);
+                nullTupleBuilder.addFieldEndOffset();
+            }
+        } else {
+            nullTupleBuilder = null;
+        }
+
         FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
                 this.getClass().getSimpleName() + this.toString());
         runFileWriter = new RunFileWriter(file, ctx.getIOManager());
@@ -108,9 +127,11 @@
         int tupleCount1 = accessorInner.getTupleCount();
 
         for (int i = 0; i < tupleCount0; ++i) {
+            boolean matchFound = false;
             for (int j = 0; j < tupleCount1; ++j) {
                 int c = compare(accessorOuter, i, accessorInner, j);
                 if (c == 0) {
+                    matchFound = true;
                     if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
                         flushFrame(outBuffer, writer);
                         appender.reset(outBuffer, true);
@@ -120,6 +141,18 @@
                     }
                 }
             }
+
+            if (!matchFound && isLeftOuter) {
+                if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
+                        nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+                    flushFrame(outBuffer, writer);
+                    appender.reset(outBuffer, true);
+                    if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
+                            nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
         }
     }
 
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index a699703..0be01c1 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -25,6 +25,8 @@
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -47,13 +49,18 @@
     private static final long serialVersionUID = 1L;
     private final ITuplePairComparatorFactory comparatorFactory;
     private final int memSize;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize) {
+            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
+            boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         super(spec, 2, 1);
         this.comparatorFactory = comparatorFactory;
         this.recordDescriptors[0] = recordDescriptor;
         this.memSize = memSize;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
     }
 
     @Override
@@ -111,6 +118,13 @@
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
 
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private JoinCacheTaskState state;
 
@@ -118,8 +132,11 @@
                 public void open() throws HyracksDataException {
                     state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
+
                     state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, isLeftOuter,
+                            nullWriters1);
+
                 }
 
                 @Override
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 2905574..6e2b16a 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -81,6 +81,8 @@
 
     private int[] buildPSizeInFrames; //Used for partition tuning
     private int freeFramesCounter; //Used for partition tuning
+    
+    private boolean isTableEmpty;	//Added for handling the case, where build side is empty (tableSize is 0)
 
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -89,10 +91,10 @@
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
-        this.buildHpc = probeHpc;
-        this.probeHpc = buildHpc;
-        this.buildKeys = keys0;
-        this.probeKeys = keys1;
+        this.buildHpc = buildHpc; 	
+        this.probeHpc = probeHpc; 	
+        this.buildKeys = keys1; 	
+        this.probeKeys = keys0;		
         this.comparators = comparators;
         this.rel0Name = rel0Name;
         this.rel1Name = rel1Name;
@@ -117,10 +119,10 @@
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
-        this.buildHpc = probeHpc;
-        this.probeHpc = buildHpc;
-        this.buildKeys = keys0;
-        this.probeKeys = keys1;
+        this.buildHpc = buildHpc; 	
+        this.probeHpc = probeHpc; 	
+        this.buildKeys = keys1; 	
+        this.probeKeys = keys0;		
         this.comparators = comparators;
         this.rel0Name = rel0Name;
         this.rel1Name = rel1Name;
@@ -171,6 +173,12 @@
     public void build(ByteBuffer buffer) throws HyracksDataException {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
+   
+        boolean print = false;
+    	if(print){
+    		accessorBuild.prettyPrint();
+    	}
+        
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
             processTuple(i, pid);
@@ -338,6 +346,7 @@
 
         createInMemoryJoiner(inMemTupCount);
         cacheInMemJoin();
+        this.isTableEmpty = (inMemTupCount == 0);
     }
 
     private void partitionTune() throws HyracksDataException {
@@ -457,10 +466,14 @@
     }
 
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-
         accessorProbe.reset(buffer);
         int tupleCount = accessorProbe.getTupleCount();
 
+        boolean print = false;
+    	if(print){
+    		accessorProbe.prettyPrint();
+    	}
+        
         if (numOfSpilledParts == 0) {
             inMemJoiner.join(buffer, writer);
             return;
@@ -604,4 +617,8 @@
                 + freeFramesCounter;
         return s;
     }
+    
+    public boolean isTableEmpty(){
+    	return this.isTableEmpty;
+    }
 }
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 3a7ee2c..cf39416 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -135,6 +136,7 @@
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
+        
 
     }
 
@@ -167,10 +169,10 @@
         ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid);
 
         builder.addActivity(this, phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        builder.addSourceEdge(1, phase1, 0);
 
         builder.addActivity(this, phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
 
@@ -253,14 +255,8 @@
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-
-            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nullWriterFactories1.length; i++) {
-                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
-                }
-            }
-
+            
+            
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
                         .getJobId(), new TaskId(getActivityId(), partition));
@@ -278,9 +274,17 @@
                     state.memForJoin = memsize - 2;
                     state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
                             nPartitions);
-                    state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                            PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                            buildHpc);
+                    if(!isLeftOuter){
+                    	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                                PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                                buildHpc);
+                    }
+                    else{
+                    	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                                PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                                buildHpc, isLeftOuter, nullWriterFactories1);
+                    }
+                    
                     state.hybridHJ.initBuild();
                 }
 
@@ -368,7 +372,9 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.hybridHJ.probe(buffer, writer);
+                	if(!state.hybridHJ.isTableEmpty()){
+                		state.hybridHJ.probe(buffer, writer);
+                	}
                 }
 
                 @Override
@@ -418,34 +424,40 @@
                     //Apply in-Mem HJ if possible
                     if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
                         int tabSize = -1;
-                        if (buildPartSize < probePartSize) {
+                        
+                        if (isLeftOuter || buildPartSize < probePartSize) {
                             tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+                           
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
-                            //Build Side is smaller
-                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
-                                    buildSideReader, probeSideReader);
+                          //Build Side is smaller
+                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+                                    buildSideReader, probeSideReader, false, pid);
 
-                        } else { //Role Reversal
+                        } 
+                        
+                        else { //Role Reversal
                             tabSize = ohhj.getProbePartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
                             }
                             //Probe Side is smaller
-                            applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
-                                    probeSideReader, buildSideReader);
+                            
+                            applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+                                    probeSideReader, buildSideReader, true, pid);
                         }
                     }
                     //Apply (Recursive) HHJ
                     else {
                         OptimizedHybridHashJoin rHHj;
-                        if (buildPartSize < probePartSize) { //Build Side is smaller
+                        if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
 
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
                                     nPartitions);
+                           
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
                                     probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
 
@@ -488,14 +500,14 @@
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+                                    
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
 
                                     int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                                     int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-                                    if (buildSideInTups < probeSideInTups) {
+                                    if (isLeftOuter || buildSideInTups < probeSideInTups) {
                                         applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
                                                 nljComparator0);
                                     } else {
@@ -507,6 +519,7 @@
                         } else { //Role Reversal (Probe Side is smaller)
                             int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
                                     nPartitions);
+                            
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
                                     buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
 
@@ -545,7 +558,7 @@
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+                                    
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
@@ -569,14 +582,14 @@
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
-                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
                         throws HyracksDataException {
 
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
                             buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
-                            isLeftOuter, nullWriters1, table);
+                            isLeftOuter, nullWriters1, table, reverse);
 
                     bReader.open();
                     rPartbuff.clear();
@@ -604,7 +617,7 @@
                         throws HyracksDataException {
 
                     NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
-                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize);
+                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null);
 
                     ByteBuffer cacheBuff = ctx.allocateFrame();
                     innerReader.open();
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
new file mode 100644
index 0000000..edca60a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.result;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final ResultSetId rsId;
+
+    private final boolean ordered;
+
+    private final IResultSerializerFactory resultSerializerFactory;
+
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
+            IResultSerializerFactory resultSerializerFactory) throws IOException {
+        super(spec, 1, 0);
+        this.rsId = rsId;
+        this.ordered = ordered;
+        this.resultSerializerFactory = resultSerializerFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+        final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
+
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+
+        final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
+        frameOutputStream.reset(outputBuffer, true);
+        PrintStream printStream = new PrintStream(frameOutputStream);
+
+        final RecordDescriptor outRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+        final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(outRecordDesc,
+                printStream);
+
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDesc);
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            IFrameWriter datasetPartitionWriter;
+
+            @Override
+            public void open() throws HyracksDataException {
+                try {
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition,
+                            nPartitions);
+                    datasetPartitionWriter.open();
+                    resultSerializer.init();
+                } catch (HyracksException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                frameTupleAccessor.reset(buffer);
+                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                    resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+                    if (!frameOutputStream.appendTuple()) {
+                        datasetPartitionWriter.nextFrame(outputBuffer);
+                        frameOutputStream.reset(outputBuffer, true);
+
+                        /* TODO(madhusudancs): This works under the assumption that no single serialized record is
+                         * longer than the buffer size.
+                         */
+                        resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+                        frameOutputStream.appendTuple();
+                    }
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                datasetPartitionWriter.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (frameOutputStream.getTupleCount() > 0) {
+                    datasetPartitionWriter.nextFrame(outputBuffer);
+                    frameOutputStream.reset(outputBuffer, true);
+                }
+                datasetPartitionWriter.close();
+            }
+        };
+    }
+}
\ No newline at end of file