Merged in all changes from Aqua/Asterix

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@405 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 4ec1b6f..7647e50 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -118,30 +118,32 @@
         return false;
     }
     
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots, byte[] bytes, int offset,
-            int length) {
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+            int dataLen1) {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
 
-        if (tupleDataEndOffset + length0 + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        int slotsLen1 = fieldSlots1.length * 4;
+        int length1 = slotsLen1 + dataLen1;
+        
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
             ByteBuffer src0 = accessor0.getBuffer();
             int slotsLen0 = accessor0.getFieldSlotsLength();
             int dataLen0 = length0 - slotsLen0;
             // Copy slots from accessor0 verbatim
             System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
-            // Copy slots from fieldSlots with the following transformation:
-            // newSlotIdx = oldSlotIdx + dataLen0
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots[i] + dataLen0));
+            // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < fieldSlots1.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
             System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
-                    + fieldSlots.length * 4, dataLen0);
-            // Copy bytes
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots.length * 4
-                    + dataLen0, length);
-            tupleDataEndOffset += (length0 + fieldSlots.length * 4 + length);
+                    + slotsLen1, dataLen0);
+            // Copy bytes1
+            System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
+                    + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
@@ -150,6 +152,41 @@
         return false;
     }
 
+    public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0, IFrameTupleAccessor accessor1,
+            int tIndex1) {
+        int slotsLen0 = fieldSlots0.length * 4;
+        int length0 = slotsLen0 + dataLen0;
+        
+        int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+        int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+        int length1 = endOffset1 - startOffset1;
+        
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            ByteBuffer src1 = accessor1.getBuffer();
+            int slotsLen1 = accessor1.getFieldSlotsLength();
+            int dataLen1 = length1 - slotsLen1;
+            // Copy fieldSlots0 verbatim
+            for (int i = 0; i < fieldSlots0.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+            }
+            // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < slotsLen1 / 4; ++i) {
+                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+            }
+            // Copy bytes0
+            System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1, 
+                    dataLen0);
+            // Copy data1
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0 
+                    + slotsLen1 + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
         int fTargetSlotsLength = fields.length * 4;
         int length = fTargetSlotsLength;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index bff101e..1a69952 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String RELATION0 = "Rel0";
+    private static final String RELATION1 = "Rel1";
 
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
@@ -56,6 +58,8 @@
     private final double factor;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,24 +73,44 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.recordsPerFrame = recordsPerFrame;
+        this.factor = factor;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+        HashPartitionActivityNode part0 = new HashPartitionActivityNode(RELATION0, keys0, 0);
+        HashPartitionActivityNode part1 = new HashPartitionActivityNode(RELATION1, keys1, 1);
         JoinActivityNode join = new JoinActivityNode();
 
-        builder.addTask(rpart);
-        builder.addSourceEdge(0, rpart, 0);
+        builder.addTask(part0);
+        builder.addSourceEdge(0, part0, 0);
 
-        builder.addTask(spart);
-        builder.addSourceEdge(1, spart, 0);
+        builder.addTask(part1);
+        builder.addSourceEdge(1, part1, 0);
 
         builder.addTask(join);
-        builder.addBlockingEdge(rpart, spart);
-        builder.addBlockingEdge(spart, join);
+        builder.addBlockingEdge(part0, part1);
+        builder.addBlockingEdge(part1, join);
 
         builder.addTargetEdge(0, join, 0);
     }
@@ -217,18 +241,24 @@
             }
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
-                    sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+                    buildWriters = (RunFileWriter[]) env.get(RELATION1);
+                    probeWriters = (RunFileWriter[]) env.get(RELATION0);
 
                     ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +271,36 @@
                     // buffer
                     int tableSize = (int) (numPartitions * recordsPerFrame * factor);
                     for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                        RunFileWriter rWriter = rWriters[partitionid];
-                        RunFileWriter sWriter = sWriters[partitionid];
-                        if (rWriter == null || sWriter == null) {
+                        RunFileWriter buildWriter = buildWriters[partitionid];
+                        RunFileWriter probeWriter = probeWriters[partitionid];
+                        if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                             continue;
                         }
                         joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                                 hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                new FrameTuplePairComparator(keys0, keys1, comparators));
+                                new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
 
                         // build
-                        RunFileReader rReader = rWriter.createReader();
-                        rReader.open();
-                        while (rReader.nextFrame(buffer)) {
-                            ByteBuffer copyBuffer = ctx.allocateFrame();
-                            FrameUtils.copy(buffer, copyBuffer);
-                            joiner.build(copyBuffer);
-                            buffer.clear();
+                        if (buildWriter != null) {
+                            RunFileReader buildReader = buildWriter.createReader();
+                            buildReader.open();
+                            while (buildReader.nextFrame(buffer)) {
+                                ByteBuffer copyBuffer = ctx.allocateFrame();
+                                FrameUtils.copy(buffer, copyBuffer);
+                                joiner.build(copyBuffer);
+                                buffer.clear();
+                            }
+                            buildReader.close();
                         }
-                        rReader.close();
 
                         // probe
-                        RunFileReader sReader = sWriter.createReader();
-                        sReader.open();
-                        while (sReader.nextFrame(buffer)) {
+                        RunFileReader probeReader = probeWriter.createReader();
+                        probeReader.open();
+                        while (probeReader.nextFrame(buffer)) {
                             joiner.join(buffer, writer);
                             buffer.clear();
                         }
-                        sReader.close();
+                        probeReader.close();
                         joiner.closeJoin(writer);
                     }
                     writer.close();
@@ -276,8 +308,8 @@
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(RELATION1, null);
+                    env.set(RELATION0, null);
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d7c1086..516bb88 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String JOINER0 = "joiner0";
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String BUILDRELATION = "BuildRel";
+    private static final String PROBERELATION = "ProbeRel";
     private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
     private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
     private final int memsize;
     private static final long serialVersionUID = 1L;
     private final int inputsize0;
     private final double factor;
+    private final int recordsPerFrame;
     private final int[] keys0;
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final int recordsPerFrame;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     /**
      * @param spec
@@ -88,19 +92,39 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.factor = factor;
+        this.recordsPerFrame = recordsPerFrame;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(BUILDRELATION);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(PROBERELATION);
 
         builder.addTask(phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        builder.addSourceEdge(1, phase1, 0);
 
         builder.addTask(phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
 
@@ -127,12 +151,18 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
-                ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
-                        .createPartitioner();
+                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+                private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+                        hashFunctionFactories).createPartitioner();
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
                 private ByteBuffer[] bufferForPartitions;
@@ -148,8 +178,8 @@
 
                     for (int i = 0; i < B; i++) {
                         ByteBuffer buf = bufferForPartitions[i];
-                        accessor0.reset(buf);
-                        if (accessor0.getTupleCount() > 0) {
+                        accessorBuild.reset(buf);
+                        if (accessorBuild.getTupleCount() > 0) {
                             write(i, buf);
                         }
                         closeWriter(i);
@@ -165,17 +195,17 @@
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
 
                     if (memoryForHashtable != memsize - 2) {
-                        accessor0.reset(buffer);
-                        int tCount = accessor0.getTupleCount();
+                        accessorBuild.reset(buffer);
+                        int tCount = accessorBuild.getTupleCount();
                         for (int i = 0; i < tCount; ++i) {
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc0.partition(accessor0, i, B);
+                                entry = hpcBuild.partition(accessorBuild, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer bufBi = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessor0, i)) {
+                                    if (appender.append(accessorBuild, i)) {
                                         break;
                                     } else {
                                         write(entry, bufBi);
@@ -184,15 +214,16 @@
                                     }
                                 }
                             } else {
-                                entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftappender.append(accessor0, i)) {
+                                        if (!ftappender.append(accessorBuild, i)) {
                                             build(inBuffer);
 
                                             ftappender.reset(inBuffer, true);
-                                        } else
+                                        } else {
                                             break;
+                                        }
                                     }
                                 } else {
                                     entry %= B;
@@ -200,7 +231,7 @@
                                     ByteBuffer bufBi = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessor0, i)) {
+                                        if (appender.append(accessorBuild, i)) {
                                             break;
                                         } else {
                                             write(entry, bufBi);
@@ -255,7 +286,7 @@
                     int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
                     joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                     bufferForPartitions = new ByteBuffer[B];
                     fWriters = new RunFileWriter[B];
                     for (int i = 0; i < B; i++) {
@@ -298,11 +329,11 @@
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String largeRelation;
+        private String relationName;
 
         public PartitionAndJoinActivityNode(String relationName) {
             super();
-            this.largeRelation = relationName;
+            this.relationName = relationName;
         }
 
         @Override
@@ -315,22 +346,28 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
-                private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                         hashFunctionFactories);
-                private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories);
-                ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
 
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
                 private final ByteBuffer inBuffer = ctx.allocateFrame();
                 private final ByteBuffer outBuffer = ctx.allocateFrame();
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private ByteBuffer[] bufferForPartitions;
                 private int B;
                 private int memoryForHashtable;
@@ -339,10 +376,10 @@
                 public void open() throws HyracksDataException {
                     joiner0 = (InMemoryHashJoin) env.get(JOINER0);
                     writer.open();
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+                    buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
                     B = (Integer) env.get(NUM_PARTITION);
                     memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
-                    sWriters = new RunFileWriter[B];
+                    probeWriters = new RunFileWriter[B];
                     bufferForPartitions = new ByteBuffer[B];
                     for (int i = 0; i < B; i++) {
                         bufferForPartitions[i] = ctx.allocateFrame();
@@ -354,18 +391,18 @@
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     if (memoryForHashtable != memsize - 2) {
-                        accessor1.reset(buffer);
-                        int tupleCount1 = accessor1.getTupleCount();
-                        for (int i = 0; i < tupleCount1; ++i) {
+                        accessorProbe.reset(buffer);
+                        int tupleCount0 = accessorProbe.getTupleCount();
+                        for (int i = 0; i < tupleCount0; ++i) {
 
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc1.partition(accessor1, i, B);
+                                entry = hpcProbe.partition(accessorProbe, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer outbuf = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessor1, i)) {
+                                    if (appender.append(accessorProbe, i)) {
                                         break;
                                     } else {
                                         write(entry, outbuf);
@@ -374,10 +411,10 @@
                                     }
                                 }
                             } else {
-                                entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftap.append(accessor1, i)) {
+                                        if (!ftap.append(accessorProbe, i)) {
                                             joiner0.join(inBuffer, writer);
                                             ftap.reset(inBuffer, true);
                                         } else
@@ -390,7 +427,7 @@
                                     ByteBuffer outbuf = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessor1, i)) {
+                                        if (appender.append(accessorProbe, i)) {
                                             break;
                                         } else {
                                             write(entry, outbuf);
@@ -415,8 +452,8 @@
                     if (memoryForHashtable != memsize - 2) {
                         for (int i = 0; i < B; i++) {
                             ByteBuffer buf = bufferForPartitions[i];
-                            accessor1.reset(buf);
-                            if (accessor1.getTupleCount() > 0) {
+                            accessorProbe.reset(buf);
+                            if (accessorProbe.getTupleCount() > 0) {
                                 write(i, buf);
                             }
                             closeWriter(i);
@@ -430,40 +467,43 @@
                             tableSize = (int) (memsize * recordsPerFrame * factor);
                         }
                         for (int partitionid = 0; partitionid < B; partitionid++) {
-                            RunFileWriter rWriter = rWriters[partitionid];
-                            RunFileWriter sWriter = sWriters[partitionid];
-                            if (rWriter == null || sWriter == null) {
+                            RunFileWriter buildWriter = buildWriters[partitionid];
+                            RunFileWriter probeWriter = probeWriters[partitionid];
+                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
 
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
-                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+                                    nullWriters1);
 
-                            RunFileReader rReader = rWriter.createReader();
-                            rReader.open();
-                            while (rReader.nextFrame(inBuffer)) {
-                                ByteBuffer copyBuffer = ctx.allocateFrame();
-                                FrameUtils.copy(inBuffer, copyBuffer);
-                                joiner.build(copyBuffer);
-                                inBuffer.clear();
+                            if (buildWriter != null) {
+                                RunFileReader buildReader = buildWriter.createReader();
+                                buildReader.open();
+                                while (buildReader.nextFrame(inBuffer)) {
+                                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                                    FrameUtils.copy(inBuffer, copyBuffer);
+                                    joiner.build(copyBuffer);
+                                    inBuffer.clear();
+                                }
+                                buildReader.close();
                             }
-                            rReader.close();
 
                             // probe
-                            RunFileReader sReader = sWriter.createReader();
-                            sReader.open();
-                            while (sReader.nextFrame(inBuffer)) {
+                            RunFileReader probeReader = probeWriter.createReader();
+                            probeReader.open();
+                            while (probeReader.nextFrame(inBuffer)) {
                                 joiner.join(inBuffer, writer);
                                 inBuffer.clear();
                             }
-                            sReader.close();
+                            probeReader.close();
                             joiner.closeJoin(writer);
                         }
                     }
                     writer.close();
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(PROBERELATION, null);
+                    env.set(BUILDRELATION, null);
                     env.set(JOINER0, null);
                     env.set(MEM_HASHTABLE, null);
                     env.set(NUM_PARTITION, null);
@@ -475,19 +515,19 @@
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer != null) {
                         writer.close();
                     }
                 }
 
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createWorkspaceFile(largeRelation);
+                        FileReference file = ctx.createWorkspaceFile(relationName);
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
-                        sWriters[i] = writer;
+                        probeWriters[i] = writer;
                     }
                     writer.nextFrame(head);
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 4d98c6a..94a9501 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -21,8 +22,10 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
 public class InMemoryHashJoin {
     private final Link[] table;
     private final List<ByteBuffer> buffers;
-    private final FrameTupleAccessor accessor0;
-    private final ITuplePartitionComputer tpc0;
-    private final FrameTupleAccessor accessor1;
-    private final ITuplePartitionComputer tpc1;
+    private final FrameTupleAccessor accessorBuild;
+    private final ITuplePartitionComputer tpcBuild;
+    private final FrameTupleAccessor accessorProbe;
+    private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
     private final FrameTuplePairComparator tpComparator;
     private final ByteBuffer outBuffer;
-
+    private final boolean isLeftOuter;
+    private final ArrayTupleBuilder nullTupleBuild;
+    
     public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator) {
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+            throws HyracksDataException {
         table = new Link[tableSize];
         buffers = new ArrayList<ByteBuffer>();
-        this.accessor0 = accessor0;
-        this.tpc0 = tpc0;
-        this.accessor1 = accessor1;
-        this.tpc1 = tpc1;
+        this.accessorBuild = accessor1;
+        this.tpcBuild = tpc1;
+        this.accessorProbe = accessor0;
+        this.tpcProbe = tpc0;
         appender = new FrameTupleAppender(ctx.getFrameSize());
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
+        this.isLeftOuter = isLeftOuter;        
+        if (isLeftOuter) {
+            int fieldCountOuter = accessor1.getFieldCount();
+            nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+            DataOutput out = nullTupleBuild.getDataOutput();
+            for (int i = 0; i < fieldCountOuter; i++) {
+                nullWriters1[i].writeNull(out);
+                nullTupleBuild.addFieldEndOffset();
+            }
+        } else {
+            nullTupleBuild = null;
+        }
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
         buffers.add(buffer);
         int bIndex = buffers.size() - 1;
-        accessor0.reset(buffer);
-        int tCount = accessor0.getTupleCount();
+        accessorBuild.reset(buffer);
+        int tCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tCount; ++i) {
-            int entry = tpc0.partition(accessor0, i, table.length);
+            int entry = tpcBuild.partition(accessorBuild, i, table.length);
             long tPointer = (((long) bIndex) << 32) + i;
             Link link = table[entry];
             if (link == null) {
@@ -70,29 +88,41 @@
     }
 
     public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessor1.reset(buffer);
-        int tupleCount1 = accessor1.getTupleCount();
-        for (int i = 0; i < tupleCount1; ++i) {
-            int entry = tpc1.partition(accessor1, i, table.length);
+        accessorProbe.reset(buffer);
+        int tupleCount0 = accessorProbe.getTupleCount();
+        for (int i = 0; i < tupleCount0; ++i) {
+            int entry = tpcProbe.partition(accessorProbe, i, table.length);
             Link link = table[entry];
+            boolean matchFound = false;
             if (link != null) {
                 for (int j = 0; j < link.size; ++j) {
                     long pointer = link.pointers[j];
                     int bIndex = (int) ((pointer >> 32) & 0xffffffff);
                     int tIndex = (int) (pointer & 0xffffffff);
-                    accessor0.reset(buffers.get(bIndex));
-                    int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+                    accessorBuild.reset(buffers.get(bIndex));
+                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                     if (c == 0) {
-                        if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                        matchFound = true;
+                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                             flushFrame(outBuffer, writer);
                             appender.reset(outBuffer, true);
-                            if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                            if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                                 throw new IllegalStateException();
                             }
                         }
                     }
                 }
             }
+            if (!matchFound && isLeftOuter) {
+                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+                    flushFrame(outBuffer, writer);
+                    appender.reset(outBuffer, true);
+                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+                            .getSize())) {
+                        throw new IllegalStateException();
+                    }                  
+                }
+            }
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 74f0146..13af25d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
     private final int tableSize;
 
     public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
-        this.tableSize = tableSize;
         recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        this.tableSize = tableSize;
+    }
+
+    public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            int tableSize) {
+        super(spec, 2, 1);
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+        this.tableSize = tableSize;
     }
 
     @Override
@@ -66,10 +87,11 @@
         HashProbeActivityNode hpa = new HashProbeActivityNode();
 
         builder.addTask(hba);
-        builder.addSourceEdge(0, hba, 0);
+        builder.addSourceEdge(1, hba, 0);
 
         builder.addTask(hpa);
-        builder.addSourceEdge(1, hpa, 0);
+        builder.addSourceEdge(0, hpa, 0);
+
         builder.addTargetEdge(0, hpa, 0);
 
         builder.addBlockingEdge(hba, hpa);
@@ -88,6 +110,13 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
@@ -99,7 +128,7 @@
                             .createPartitioner();
                     joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..f82a7ef
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+        super(spec, 1, outputArity);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputOperatorNodePushable() {
+            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+            @Override
+            public void close() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.close();
+                }
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    FrameUtils.flushFrame(bufferAccessor, writer);
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.open();
+                }
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                writers[index] = writer;
+            }
+        };
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 4958a35..26b3dab 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.tests.integration;
 
+import java.io.DataOutput;
 import java.io.File;
 
 import org.junit.Test;
@@ -23,8 +24,11 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -51,6 +55,24 @@
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
     private static final boolean DEBUG = true;
 
+    static private class NoopNullWriterFactory implements INullWriterFactory {
+
+        private static final long serialVersionUID = 1L;
+        public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+        private NoopNullWriterFactory() {
+        }
+
+        @Override
+        public INullWriter createNullWriter() {
+            return new INullWriter() {
+                @Override
+                public void writeNull(DataOutput out) throws HyracksDataException {
+                }
+            };
+        }
+    }
+
     /*
      * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
@@ -274,6 +296,251 @@
     }
 
     @Test
+    public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+                new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+                new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
     public void customerOrderCIDJoinMulti() throws Exception {
         JobSpecification spec = new JobSpecification();
 
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
index 40cb7da..0d5dc8f 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
@@ -32,6 +32,8 @@
 
     public int getTokenLength();
 
+    public int getNumTokens();
+
     public void writeToken(DataOutput dos) throws IOException;
 
     public RecordDescriptor getTokenSchema();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
index 73635f9..425436b 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
@@ -30,6 +30,7 @@
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
     private final char delimiter;
+    private final byte typeTag;
     private byte[] data;
     private int start;
     private int length;
@@ -38,8 +39,14 @@
     private int tokenStart;
     private int pos;
 
+    public DelimitedUTF8StringBinaryTokenizer(char delimiter, byte typeTag) {
+        this.delimiter = delimiter;
+        this.typeTag = typeTag;
+    }
+
     public DelimitedUTF8StringBinaryTokenizer(char delimiter) {
         this.delimiter = delimiter;
+        this.typeTag = -1;
     }
 
     @Override
@@ -88,6 +95,9 @@
 
     @Override
     public void writeToken(DataOutput dos) throws IOException {
+        if (typeTag > 0)
+            dos.write(typeTag);
+
         // WARNING: 2-byte length indicator is specific to UTF-8
         dos.writeShort((short) tokenLength);
         dos.write(data, tokenStart, tokenLength);
@@ -97,4 +107,10 @@
     public RecordDescriptor getTokenSchema() {
         return tokenSchema;
     }
+
+    // cannot be implemented for this tokenizer
+    @Override
+    public int getNumTokens() {
+        return -1;
+    }
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
index e3e0be3..2e85db5 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
@@ -22,13 +22,20 @@
 
     private static final long serialVersionUID = 1L;
     private final char delimiter;
+    private final byte typeTag;
+
+    public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter, byte typeTag) {
+        this.delimiter = delimiter;
+        this.typeTag = typeTag;
+    }
 
     public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter) {
         this.delimiter = delimiter;
+        this.typeTag = -1;
     }
 
     @Override
     public IBinaryTokenizer createBinaryTokenizer() {
-        return new DelimitedUTF8StringBinaryTokenizer(delimiter);
+        return new DelimitedUTF8StringBinaryTokenizer(delimiter, typeTag);
     }
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
index 54fc371..2cc0b6c 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
@@ -142,4 +142,9 @@
     public RecordDescriptor getTokenSchema() {
         return tokenSchema;
     }
-}
+
+    @Override
+    public int getNumTokens() {
+        return 0;
+    }
+}
\ No newline at end of file