1. Ray's fix on TPCH hash outer join test
2. Add block_nest_loop_join runtime operators, and the interface ITuplePairComparator and ITuplePairComparatorFactory
3. Add TPCH tests for block_nested_loop_join
  

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@414 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..5e7ed12
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+    public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+            throws HyracksDataException;
+
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..be06fa3
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+
+public interface ITuplePairComparatorFactory extends Serializable{
+    
+    public ITuplePairComparator createTuplePairComparator();
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..28251a5
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,142 @@
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+    private final FrameTupleAccessor accessorInner;
+    private final FrameTupleAccessor accessorOuter;
+    private final FrameTupleAppender appender;
+    private final ITuplePairComparator tpComparator;
+    private final ByteBuffer outBuffer;
+    private final ByteBuffer innerBuffer;
+    private final List<ByteBuffer> outBuffers;
+    private final int memSize;
+    private final IHyracksStageletContext ctx;
+    private RunFileReader runFileReader;
+    private int currentMemSize = 0;
+    private final RunFileWriter runFileWriter;
+
+    public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+            ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+        this.accessorInner = accessor1;
+        this.accessorOuter = accessor0;
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.tpComparator = comparators;
+        this.outBuffer = ctx.allocateFrame();
+        this.innerBuffer = ctx.allocateFrame();
+        this.appender.reset(outBuffer, true);
+        this.outBuffers = new ArrayList<ByteBuffer>();
+        this.memSize = memSize;
+        this.ctx = ctx;
+
+        FileReference file = ctx.getJobletContext().createWorkspaceFile(
+                this.getClass().getSimpleName() + this.toString());
+        runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        runFileWriter.open();
+    }
+
+    public void cache(ByteBuffer buffer) throws HyracksDataException {
+        runFileWriter.nextFrame(buffer);
+        System.out.println(runFileWriter.getFileSize());
+    }
+
+    public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+        if (outBuffers.size() < memSize - 3) {
+            ByteBuffer outerBufferCopy = ctx.allocateFrame();
+            FrameUtils.copy(outerBuffer, outerBufferCopy);
+            outBuffers.add(outerBufferCopy);
+            currentMemSize++;
+            return;
+        }
+        if (currentMemSize < memSize - 3) {
+            FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+            currentMemSize++;
+            return;
+        }
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        currentMemSize = 0;
+    }
+
+    private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+            throws HyracksDataException {
+        accessorOuter.reset(outerBuffer);
+        accessorInner.reset(innerBuffer);
+        int tupleCount0 = accessorOuter.getTupleCount();
+        int tupleCount1 = accessorInner.getTupleCount();
+
+        for (int i = 0; i < tupleCount0; ++i) {
+            for (int j = 0; j < tupleCount1; ++j) {
+                int c = compare(accessorOuter, i, accessorInner, j);
+                if (c == 0) {
+                    if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                        flushFrame(outBuffer, writer);
+                        appender.reset(outBuffer, true);
+                        if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void closeCache() throws HyracksDataException {
+        if (runFileWriter != null) {
+            runFileWriter.close();
+        }
+    }
+
+    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        outBuffers.clear();
+        currentMemSize = 0;
+
+        if (appender.getTupleCount() > 0) {
+            flushFrame(outBuffer, writer);
+        }
+    }
+
+    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        writer.nextFrame(buffer);
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+    }
+
+    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+        if (c != 0) {
+            return c;
+        }
+        return 0;
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..108e6a9
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,142 @@
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final String JOINER = "joiner";
+
+    private static final long serialVersionUID = 1L;
+    private final ITuplePairComparatorFactory comparatorFactory;
+    private final int memSize;
+
+    public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+            RecordDescriptor recordDescriptor, int memSize) {
+        super(spec, 2, 1);
+        this.comparatorFactory = comparatorFactory;
+        this.recordDescriptors[0] = recordDescriptor;
+        this.memSize = memSize;
+    }
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        JoinCacheActivityNode jc = new JoinCacheActivityNode();
+        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode();
+
+        builder.addTask(jc);
+        builder.addSourceEdge(1, jc, 0);
+
+        builder.addTask(nlj);
+        builder.addSourceEdge(0, nlj, 0);
+
+        builder.addTargetEdge(0, nlj, 0);
+        builder.addBlockingEdge(jc, nlj);
+    }
+
+    private class JoinCacheActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                    FrameUtils.copy(buffer, copyBuffer);
+                    copyBuffer.flip();
+                    joiner.cache(copyBuffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeCache();
+                    env.set(JOINER, joiner);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return NestedLoopJoinOperatorDescriptor.this;
+        }
+    }
+
+    private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = (NestedLoopJoin) env.get(JOINER);
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    joiner.join(buffer, writer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeJoin(writer);
+                    writer.close();
+                    env.set(JOINER, null);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return NestedLoopJoinOperatorDescriptor.this;
+        }
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 26b3dab..db5985d 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -16,6 +16,7 @@
 
 import java.io.DataOutput;
 import java.io.File;
+import java.io.IOException;
 
 import org.junit.Test;
 
@@ -68,6 +69,11 @@
             return new INullWriter() {
                 @Override
                 public void writeNull(DataOutput out) throws HyracksDataException {
+                    try {
+                        out.writeShort(0);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
             };
         }
@@ -361,7 +367,7 @@
         // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
         //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
         // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
-        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
@@ -443,7 +449,7 @@
         // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
         //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
         // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
-        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
@@ -525,7 +531,7 @@
         // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
         //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
         // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
-        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
new file mode 100644
index 0000000..32c882a
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+
+public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
+    private static final boolean DEBUG = true;
+
+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        private final IBinaryComparatorFactory bFactory;
+        private final int pos0;
+        private final int pos1;
+
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+            this.bFactory = bFactory;
+            this.pos0 = pos0;
+            this.pos1 = pos1;
+        }
+
+        @Override
+        public ITuplePairComparator createTuplePairComparator() {
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+        }
+    }
+
+    static class JoinComparator implements ITuplePairComparator {
+
+        private final IBinaryComparator bComparator;
+        private final int field0;
+        private final int field1;
+
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+            this.bComparator = bComparator;
+            this.field0 = field0;
+            this.field1 = field1;
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+            return 0;
+        }
+    }
+
+    /*
+     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
+     * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
+     * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
+     * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
+     * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
+     * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
+     * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
+     * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
+     * NULL, O_COMMENT VARCHAR(79) NOT NULL );
+     */
+    @Test
+    public void customerOrderCIDJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 20);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinMulti() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinAutoExpand() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+}
\ No newline at end of file