Merged trunk -r 363:437 into branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@468 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs
index 1ca1bd4..450f5c4 100644
--- a/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Thu Jul 29 14:32:56 PDT 2010
+#Fri May 20 19:34:04 PDT 2011
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-dataflow-std/pom.xml b/hyracks-dataflow-std/pom.xml
index 0940856..747e657 100644
--- a/hyracks-dataflow-std/pom.xml
+++ b/hyracks-dataflow-std/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-dataflow-std</artifactId>
-  <version>0.1.4</version>
+  <version>0.1.5</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.4</version>
+    <version>0.1.5</version>
   </parent>
 
   <build>
@@ -27,14 +27,14 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.4</version>
+  		<version>0.1.5</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.4</version>
+  		<version>0.1.5</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
index a99cba9..2ddffde 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
@@ -69,7 +69,7 @@
                     throws HyracksDataException {
                 count = IntegerSerializerDeserializer.getInt(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
 
             }
@@ -89,7 +89,7 @@
                     throws HyracksDataException {
                 count += IntegerSerializerDeserializer.getInt(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
             }
         };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
index 7e6bc8c..8fd1f13 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
@@ -103,7 +103,7 @@
                     throws HyracksDataException {
                 sum = FloatSerializerDeserializer.getFloat(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
             }
 
@@ -117,7 +117,7 @@
                     throws HyracksDataException {
                 sum += FloatSerializerDeserializer.getFloat(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
             }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
index 0c3b0d5..cc999d5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
@@ -100,7 +100,7 @@
                     throws HyracksDataException {
                 minmax = FloatSerializerDeserializer.getFloat(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
 
             }
@@ -115,7 +115,7 @@
                     throws HyracksDataException {
                 minmax = FloatSerializerDeserializer.getFloat(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
 
             }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
index 64335f1..c6bcc81 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
@@ -102,7 +102,7 @@
                     throws HyracksDataException {
                 sum = IntegerSerializerDeserializer.getInt(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
             }
 
@@ -116,7 +116,7 @@
                     throws HyracksDataException {
                 sum += IntegerSerializerDeserializer.getInt(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+                        accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
                                 + accessor.getFieldStartOffset(tIndex, fIndex));
             }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 0eebd93..e4613dd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 7e97975..4453412 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a038a40..6ee9ab37 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.connectors;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -52,7 +53,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         JobSpecification jobSpec = plan.getJobSpecification();
         IOperatorDescriptor consumer = jobSpec.getConsumer(this);
         IOperatorDescriptor producer = jobSpec.getProducer(this);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index bff101e..1a69952 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String RELATION0 = "Rel0";
+    private static final String RELATION1 = "Rel1";
 
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
@@ -56,6 +58,8 @@
     private final double factor;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,24 +73,44 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.recordsPerFrame = recordsPerFrame;
+        this.factor = factor;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+        HashPartitionActivityNode part0 = new HashPartitionActivityNode(RELATION0, keys0, 0);
+        HashPartitionActivityNode part1 = new HashPartitionActivityNode(RELATION1, keys1, 1);
         JoinActivityNode join = new JoinActivityNode();
 
-        builder.addTask(rpart);
-        builder.addSourceEdge(0, rpart, 0);
+        builder.addTask(part0);
+        builder.addSourceEdge(0, part0, 0);
 
-        builder.addTask(spart);
-        builder.addSourceEdge(1, spart, 0);
+        builder.addTask(part1);
+        builder.addSourceEdge(1, part1, 0);
 
         builder.addTask(join);
-        builder.addBlockingEdge(rpart, spart);
-        builder.addBlockingEdge(spart, join);
+        builder.addBlockingEdge(part0, part1);
+        builder.addBlockingEdge(part1, join);
 
         builder.addTargetEdge(0, join, 0);
     }
@@ -217,18 +241,24 @@
             }
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
-                    sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+                    buildWriters = (RunFileWriter[]) env.get(RELATION1);
+                    probeWriters = (RunFileWriter[]) env.get(RELATION0);
 
                     ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +271,36 @@
                     // buffer
                     int tableSize = (int) (numPartitions * recordsPerFrame * factor);
                     for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                        RunFileWriter rWriter = rWriters[partitionid];
-                        RunFileWriter sWriter = sWriters[partitionid];
-                        if (rWriter == null || sWriter == null) {
+                        RunFileWriter buildWriter = buildWriters[partitionid];
+                        RunFileWriter probeWriter = probeWriters[partitionid];
+                        if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                             continue;
                         }
                         joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                                 hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                new FrameTuplePairComparator(keys0, keys1, comparators));
+                                new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
 
                         // build
-                        RunFileReader rReader = rWriter.createReader();
-                        rReader.open();
-                        while (rReader.nextFrame(buffer)) {
-                            ByteBuffer copyBuffer = ctx.allocateFrame();
-                            FrameUtils.copy(buffer, copyBuffer);
-                            joiner.build(copyBuffer);
-                            buffer.clear();
+                        if (buildWriter != null) {
+                            RunFileReader buildReader = buildWriter.createReader();
+                            buildReader.open();
+                            while (buildReader.nextFrame(buffer)) {
+                                ByteBuffer copyBuffer = ctx.allocateFrame();
+                                FrameUtils.copy(buffer, copyBuffer);
+                                joiner.build(copyBuffer);
+                                buffer.clear();
+                            }
+                            buildReader.close();
                         }
-                        rReader.close();
 
                         // probe
-                        RunFileReader sReader = sWriter.createReader();
-                        sReader.open();
-                        while (sReader.nextFrame(buffer)) {
+                        RunFileReader probeReader = probeWriter.createReader();
+                        probeReader.open();
+                        while (probeReader.nextFrame(buffer)) {
                             joiner.join(buffer, writer);
                             buffer.clear();
                         }
-                        sReader.close();
+                        probeReader.close();
                         joiner.closeJoin(writer);
                     }
                     writer.close();
@@ -276,8 +308,8 @@
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(RELATION1, null);
+                    env.set(RELATION0, null);
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d7c1086..516bb88 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String JOINER0 = "joiner0";
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String BUILDRELATION = "BuildRel";
+    private static final String PROBERELATION = "ProbeRel";
     private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
     private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
     private final int memsize;
     private static final long serialVersionUID = 1L;
     private final int inputsize0;
     private final double factor;
+    private final int recordsPerFrame;
     private final int[] keys0;
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final int recordsPerFrame;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     /**
      * @param spec
@@ -88,19 +92,39 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.factor = factor;
+        this.recordsPerFrame = recordsPerFrame;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(BUILDRELATION);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(PROBERELATION);
 
         builder.addTask(phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        builder.addSourceEdge(1, phase1, 0);
 
         builder.addTask(phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
 
@@ -127,12 +151,18 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
-                ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
-                        .createPartitioner();
+                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+                private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+                        hashFunctionFactories).createPartitioner();
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
                 private ByteBuffer[] bufferForPartitions;
@@ -148,8 +178,8 @@
 
                     for (int i = 0; i < B; i++) {
                         ByteBuffer buf = bufferForPartitions[i];
-                        accessor0.reset(buf);
-                        if (accessor0.getTupleCount() > 0) {
+                        accessorBuild.reset(buf);
+                        if (accessorBuild.getTupleCount() > 0) {
                             write(i, buf);
                         }
                         closeWriter(i);
@@ -165,17 +195,17 @@
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
 
                     if (memoryForHashtable != memsize - 2) {
-                        accessor0.reset(buffer);
-                        int tCount = accessor0.getTupleCount();
+                        accessorBuild.reset(buffer);
+                        int tCount = accessorBuild.getTupleCount();
                         for (int i = 0; i < tCount; ++i) {
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc0.partition(accessor0, i, B);
+                                entry = hpcBuild.partition(accessorBuild, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer bufBi = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessor0, i)) {
+                                    if (appender.append(accessorBuild, i)) {
                                         break;
                                     } else {
                                         write(entry, bufBi);
@@ -184,15 +214,16 @@
                                     }
                                 }
                             } else {
-                                entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftappender.append(accessor0, i)) {
+                                        if (!ftappender.append(accessorBuild, i)) {
                                             build(inBuffer);
 
                                             ftappender.reset(inBuffer, true);
-                                        } else
+                                        } else {
                                             break;
+                                        }
                                     }
                                 } else {
                                     entry %= B;
@@ -200,7 +231,7 @@
                                     ByteBuffer bufBi = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessor0, i)) {
+                                        if (appender.append(accessorBuild, i)) {
                                             break;
                                         } else {
                                             write(entry, bufBi);
@@ -255,7 +286,7 @@
                     int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
                     joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                     bufferForPartitions = new ByteBuffer[B];
                     fWriters = new RunFileWriter[B];
                     for (int i = 0; i < B; i++) {
@@ -298,11 +329,11 @@
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String largeRelation;
+        private String relationName;
 
         public PartitionAndJoinActivityNode(String relationName) {
             super();
-            this.largeRelation = relationName;
+            this.relationName = relationName;
         }
 
         @Override
@@ -315,22 +346,28 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
-                private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                         hashFunctionFactories);
-                private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories);
-                ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
 
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
                 private final ByteBuffer inBuffer = ctx.allocateFrame();
                 private final ByteBuffer outBuffer = ctx.allocateFrame();
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private ByteBuffer[] bufferForPartitions;
                 private int B;
                 private int memoryForHashtable;
@@ -339,10 +376,10 @@
                 public void open() throws HyracksDataException {
                     joiner0 = (InMemoryHashJoin) env.get(JOINER0);
                     writer.open();
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+                    buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
                     B = (Integer) env.get(NUM_PARTITION);
                     memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
-                    sWriters = new RunFileWriter[B];
+                    probeWriters = new RunFileWriter[B];
                     bufferForPartitions = new ByteBuffer[B];
                     for (int i = 0; i < B; i++) {
                         bufferForPartitions[i] = ctx.allocateFrame();
@@ -354,18 +391,18 @@
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     if (memoryForHashtable != memsize - 2) {
-                        accessor1.reset(buffer);
-                        int tupleCount1 = accessor1.getTupleCount();
-                        for (int i = 0; i < tupleCount1; ++i) {
+                        accessorProbe.reset(buffer);
+                        int tupleCount0 = accessorProbe.getTupleCount();
+                        for (int i = 0; i < tupleCount0; ++i) {
 
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc1.partition(accessor1, i, B);
+                                entry = hpcProbe.partition(accessorProbe, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer outbuf = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessor1, i)) {
+                                    if (appender.append(accessorProbe, i)) {
                                         break;
                                     } else {
                                         write(entry, outbuf);
@@ -374,10 +411,10 @@
                                     }
                                 }
                             } else {
-                                entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftap.append(accessor1, i)) {
+                                        if (!ftap.append(accessorProbe, i)) {
                                             joiner0.join(inBuffer, writer);
                                             ftap.reset(inBuffer, true);
                                         } else
@@ -390,7 +427,7 @@
                                     ByteBuffer outbuf = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessor1, i)) {
+                                        if (appender.append(accessorProbe, i)) {
                                             break;
                                         } else {
                                             write(entry, outbuf);
@@ -415,8 +452,8 @@
                     if (memoryForHashtable != memsize - 2) {
                         for (int i = 0; i < B; i++) {
                             ByteBuffer buf = bufferForPartitions[i];
-                            accessor1.reset(buf);
-                            if (accessor1.getTupleCount() > 0) {
+                            accessorProbe.reset(buf);
+                            if (accessorProbe.getTupleCount() > 0) {
                                 write(i, buf);
                             }
                             closeWriter(i);
@@ -430,40 +467,43 @@
                             tableSize = (int) (memsize * recordsPerFrame * factor);
                         }
                         for (int partitionid = 0; partitionid < B; partitionid++) {
-                            RunFileWriter rWriter = rWriters[partitionid];
-                            RunFileWriter sWriter = sWriters[partitionid];
-                            if (rWriter == null || sWriter == null) {
+                            RunFileWriter buildWriter = buildWriters[partitionid];
+                            RunFileWriter probeWriter = probeWriters[partitionid];
+                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
 
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
-                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+                                    nullWriters1);
 
-                            RunFileReader rReader = rWriter.createReader();
-                            rReader.open();
-                            while (rReader.nextFrame(inBuffer)) {
-                                ByteBuffer copyBuffer = ctx.allocateFrame();
-                                FrameUtils.copy(inBuffer, copyBuffer);
-                                joiner.build(copyBuffer);
-                                inBuffer.clear();
+                            if (buildWriter != null) {
+                                RunFileReader buildReader = buildWriter.createReader();
+                                buildReader.open();
+                                while (buildReader.nextFrame(inBuffer)) {
+                                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                                    FrameUtils.copy(inBuffer, copyBuffer);
+                                    joiner.build(copyBuffer);
+                                    inBuffer.clear();
+                                }
+                                buildReader.close();
                             }
-                            rReader.close();
 
                             // probe
-                            RunFileReader sReader = sWriter.createReader();
-                            sReader.open();
-                            while (sReader.nextFrame(inBuffer)) {
+                            RunFileReader probeReader = probeWriter.createReader();
+                            probeReader.open();
+                            while (probeReader.nextFrame(inBuffer)) {
                                 joiner.join(inBuffer, writer);
                                 inBuffer.clear();
                             }
-                            sReader.close();
+                            probeReader.close();
                             joiner.closeJoin(writer);
                         }
                     }
                     writer.close();
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(PROBERELATION, null);
+                    env.set(BUILDRELATION, null);
                     env.set(JOINER0, null);
                     env.set(MEM_HASHTABLE, null);
                     env.set(NUM_PARTITION, null);
@@ -475,19 +515,19 @@
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer != null) {
                         writer.close();
                     }
                 }
 
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createWorkspaceFile(largeRelation);
+                        FileReference file = ctx.createWorkspaceFile(relationName);
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
-                        sWriters[i] = writer;
+                        probeWriters[i] = writer;
                     }
                     writer.nextFrame(head);
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 4d98c6a..94a9501 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -21,8 +22,10 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
 public class InMemoryHashJoin {
     private final Link[] table;
     private final List<ByteBuffer> buffers;
-    private final FrameTupleAccessor accessor0;
-    private final ITuplePartitionComputer tpc0;
-    private final FrameTupleAccessor accessor1;
-    private final ITuplePartitionComputer tpc1;
+    private final FrameTupleAccessor accessorBuild;
+    private final ITuplePartitionComputer tpcBuild;
+    private final FrameTupleAccessor accessorProbe;
+    private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
     private final FrameTuplePairComparator tpComparator;
     private final ByteBuffer outBuffer;
-
+    private final boolean isLeftOuter;
+    private final ArrayTupleBuilder nullTupleBuild;
+    
     public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator) {
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+            throws HyracksDataException {
         table = new Link[tableSize];
         buffers = new ArrayList<ByteBuffer>();
-        this.accessor0 = accessor0;
-        this.tpc0 = tpc0;
-        this.accessor1 = accessor1;
-        this.tpc1 = tpc1;
+        this.accessorBuild = accessor1;
+        this.tpcBuild = tpc1;
+        this.accessorProbe = accessor0;
+        this.tpcProbe = tpc0;
         appender = new FrameTupleAppender(ctx.getFrameSize());
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
+        this.isLeftOuter = isLeftOuter;        
+        if (isLeftOuter) {
+            int fieldCountOuter = accessor1.getFieldCount();
+            nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+            DataOutput out = nullTupleBuild.getDataOutput();
+            for (int i = 0; i < fieldCountOuter; i++) {
+                nullWriters1[i].writeNull(out);
+                nullTupleBuild.addFieldEndOffset();
+            }
+        } else {
+            nullTupleBuild = null;
+        }
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
         buffers.add(buffer);
         int bIndex = buffers.size() - 1;
-        accessor0.reset(buffer);
-        int tCount = accessor0.getTupleCount();
+        accessorBuild.reset(buffer);
+        int tCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tCount; ++i) {
-            int entry = tpc0.partition(accessor0, i, table.length);
+            int entry = tpcBuild.partition(accessorBuild, i, table.length);
             long tPointer = (((long) bIndex) << 32) + i;
             Link link = table[entry];
             if (link == null) {
@@ -70,29 +88,41 @@
     }
 
     public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessor1.reset(buffer);
-        int tupleCount1 = accessor1.getTupleCount();
-        for (int i = 0; i < tupleCount1; ++i) {
-            int entry = tpc1.partition(accessor1, i, table.length);
+        accessorProbe.reset(buffer);
+        int tupleCount0 = accessorProbe.getTupleCount();
+        for (int i = 0; i < tupleCount0; ++i) {
+            int entry = tpcProbe.partition(accessorProbe, i, table.length);
             Link link = table[entry];
+            boolean matchFound = false;
             if (link != null) {
                 for (int j = 0; j < link.size; ++j) {
                     long pointer = link.pointers[j];
                     int bIndex = (int) ((pointer >> 32) & 0xffffffff);
                     int tIndex = (int) (pointer & 0xffffffff);
-                    accessor0.reset(buffers.get(bIndex));
-                    int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+                    accessorBuild.reset(buffers.get(bIndex));
+                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                     if (c == 0) {
-                        if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                        matchFound = true;
+                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                             flushFrame(outBuffer, writer);
                             appender.reset(outBuffer, true);
-                            if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                            if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                                 throw new IllegalStateException();
                             }
                         }
                     }
                 }
             }
+            if (!matchFound && isLeftOuter) {
+                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+                    flushFrame(outBuffer, writer);
+                    appender.reset(outBuffer, true);
+                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+                            .getSize())) {
+                        throw new IllegalStateException();
+                    }                  
+                }
+            }
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 74f0146..13af25d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
     private final int tableSize;
 
     public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
-        this.tableSize = tableSize;
         recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        this.tableSize = tableSize;
+    }
+
+    public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            int tableSize) {
+        super(spec, 2, 1);
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+        this.tableSize = tableSize;
     }
 
     @Override
@@ -66,10 +87,11 @@
         HashProbeActivityNode hpa = new HashProbeActivityNode();
 
         builder.addTask(hba);
-        builder.addSourceEdge(0, hba, 0);
+        builder.addSourceEdge(1, hba, 0);
 
         builder.addTask(hpa);
-        builder.addSourceEdge(1, hpa, 0);
+        builder.addSourceEdge(0, hpa, 0);
+
         builder.addTargetEdge(0, hpa, 0);
 
         builder.addBlockingEdge(hba, hpa);
@@ -88,6 +110,13 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
@@ -99,7 +128,7 @@
                             .createPartitioner();
                     joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..5ee8fd5
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+    private final FrameTupleAccessor accessorInner;
+    private final FrameTupleAccessor accessorOuter;
+    private final FrameTupleAppender appender;
+    private final ITuplePairComparator tpComparator;
+    private final ByteBuffer outBuffer;
+    private final ByteBuffer innerBuffer;
+    private final List<ByteBuffer> outBuffers;
+    private final int memSize;
+    private final IHyracksStageletContext ctx;
+    private RunFileReader runFileReader;
+    private int currentMemSize = 0;
+    private final RunFileWriter runFileWriter;
+
+    public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+            ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+        this.accessorInner = accessor1;
+        this.accessorOuter = accessor0;
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.tpComparator = comparators;
+        this.outBuffer = ctx.allocateFrame();
+        this.innerBuffer = ctx.allocateFrame();
+        this.appender.reset(outBuffer, true);
+        this.outBuffers = new ArrayList<ByteBuffer>();
+        this.memSize = memSize;
+        this.ctx = ctx;
+
+        FileReference file = ctx.getJobletContext().createWorkspaceFile(
+                this.getClass().getSimpleName() + this.toString());
+        runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        runFileWriter.open();
+    }
+
+    public void cache(ByteBuffer buffer) throws HyracksDataException {
+        runFileWriter.nextFrame(buffer);
+        System.out.println(runFileWriter.getFileSize());
+    }
+
+    public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+        if (outBuffers.size() < memSize - 3) {
+            createAndCopyFrame(outerBuffer);
+            return;
+        }
+        if (currentMemSize < memSize - 3) {
+            reloadFrame(outerBuffer);
+            return;
+        }
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        currentMemSize = 0;
+        reloadFrame(outerBuffer);
+    }
+
+    private void createAndCopyFrame(ByteBuffer outerBuffer) {
+        ByteBuffer outerBufferCopy = ctx.allocateFrame();
+        FrameUtils.copy(outerBuffer, outerBufferCopy);
+        outBuffers.add(outerBufferCopy);
+        currentMemSize++;
+    }
+
+    private void reloadFrame(ByteBuffer outerBuffer) {
+        outBuffers.get(currentMemSize).clear();
+        FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+        currentMemSize++;
+    }
+
+    private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+            throws HyracksDataException {
+        accessorOuter.reset(outerBuffer);
+        accessorInner.reset(innerBuffer);
+        int tupleCount0 = accessorOuter.getTupleCount();
+        int tupleCount1 = accessorInner.getTupleCount();
+
+        for (int i = 0; i < tupleCount0; ++i) {
+            for (int j = 0; j < tupleCount1; ++j) {
+                int c = compare(accessorOuter, i, accessorInner, j);
+                if (c == 0) {
+                    if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                        flushFrame(outBuffer, writer);
+                        appender.reset(outBuffer, true);
+                        if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void closeCache() throws HyracksDataException {
+        if (runFileWriter != null) {
+            runFileWriter.close();
+        }
+    }
+
+    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        outBuffers.clear();
+        currentMemSize = 0;
+
+        if (appender.getTupleCount() > 0) {
+            flushFrame(outBuffer, writer);
+        }
+    }
+
+    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        writer.nextFrame(buffer);
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+    }
+
+    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+        if (c != 0) {
+            return c;
+        }
+        return 0;
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..1436e30
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final String JOINER = "joiner";
+
+    private static final long serialVersionUID = 1L;
+    private final ITuplePairComparatorFactory comparatorFactory;
+    private final int memSize;
+
+    public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+            RecordDescriptor recordDescriptor, int memSize) {
+        super(spec, 2, 1);
+        this.comparatorFactory = comparatorFactory;
+        this.recordDescriptors[0] = recordDescriptor;
+        this.memSize = memSize;
+    }
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        JoinCacheActivityNode jc = new JoinCacheActivityNode();
+        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode();
+
+        builder.addTask(jc);
+        builder.addSourceEdge(1, jc, 0);
+
+        builder.addTask(nlj);
+        builder.addSourceEdge(0, nlj, 0);
+
+        builder.addTargetEdge(0, nlj, 0);
+        builder.addBlockingEdge(jc, nlj);
+    }
+
+    private class JoinCacheActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                    FrameUtils.copy(buffer, copyBuffer);
+                    copyBuffer.flip();
+                    joiner.cache(copyBuffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeCache();
+                    env.set(JOINER, joiner);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return NestedLoopJoinOperatorDescriptor.this;
+        }
+    }
+
+    private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = (NestedLoopJoin) env.get(JOINER);
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    joiner.join(buffer, writer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeJoin(writer);
+                    writer.close();
+                    env.set(JOINER, null);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return NestedLoopJoinOperatorDescriptor.this;
+        }
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..f82a7ef
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+        super(spec, 1, outputArity);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputOperatorNodePushable() {
+            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+            @Override
+            public void close() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.close();
+                }
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    FrameUtils.flushFrame(bufferAccessor, writer);
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.open();
+                }
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                writers[index] = writer;
+            }
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 603b194..3742e91 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -240,12 +240,12 @@
         fta2.reset(buf2);
         for (int f = 0; f < comparators.length; ++f) {
             int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : buf1.getShort(j1 + (fIdx - 1) * 2);
-            int f1End = buf1.getShort(j1 + fIdx * 2);
+            int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
+            int f1End = buf1.getInt(j1 + fIdx * 4);
             int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
             int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : buf2.getShort(j2 + (fIdx - 1) * 2);
-            int f2End = buf2.getShort(j2 + fIdx * 2);
+            int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
+            int f2End = buf2.getInt(j2 + fIdx * 4);
             int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
             int l2 = f2End - f2Start;
             int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..eccb945
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+    public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+        super(spec, nInputs, 1);
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        UnionActivityNode uba = new UnionActivityNode();
+        builder.addTask(uba);
+        for (int i = 0; i < inputArity; ++i) {
+            builder.addSourceEdge(i, uba, i);
+        }
+        builder.addTargetEdge(0, uba, 0);
+    }
+
+    private class UnionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public UnionActivityNode() {
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                throws HyracksDataException {
+            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            return new UnionOperator(ctx, inRecordDesc);
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return UnionAllOperatorDescriptor.this;
+        }
+
+    }
+
+    private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+        private int nOpened;
+
+        private int nClosed;
+
+        public UnionOperator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc) {
+            nOpened = 0;
+            nClosed = 0;
+        }
+
+        @Override
+        public int getInputArity() {
+            return inputArity;
+        }
+
+        @Override
+        public IFrameWriter getInputFrameWriter(int index) {
+            return new IFrameWriter() {
+                @Override
+                public void open() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nOpened == 1) {
+                            writer.open();
+                        }
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.nextFrame(buffer);
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.flush();
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nClosed == inputArity) {
+                            writer.close();
+                        }
+                    }
+                }
+            };
+        }
+    }
+}
\ No newline at end of file