Merged hyracks_asterix_stabilization_left_outer_join
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging_bigmerge_target@2760 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 8cbd2d8..d153f90 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -44,6 +44,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -56,8 +57,8 @@
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
/**
- * Left input is broadcast and preserves its local properties.
- * Right input can be partitioned in any way.
+ * Left input is broadcast and preserves its local properties. Right input can
+ * be partitioned in any way.
*/
public class NLJoinPOperator extends AbstractJoinPOperator {
@@ -97,7 +98,7 @@
pp = pv1.getPartitioningProperty();
}
} else {
- pp = IPartitioningProperty.UNPARTITIONED;
+ pp = IPartitioningProperty.UNPARTITIONED;
}
List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
@@ -122,7 +123,8 @@
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
conditionInputSchemas[0] = propagatedSchema;
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -135,10 +137,19 @@
switch (kind) {
case INNER: {
- opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize);
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+ null);
break;
}
- case LEFT_OUTER:
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+ nullWriterFactories);
+ break;
+ }
default: {
throw new NotImplementedException();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 7e84229..6870e71 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -14,15 +14,18 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -42,9 +45,12 @@
private RunFileReader runFileReader;
private int currentMemSize = 0;
private final RunFileWriter runFileWriter;
+ private final boolean isLeftOuter;
+ private final ArrayTupleBuilder nullTupleBuilder;
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
- ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+ ITuplePairComparator comparators, int memSize, boolean isLeftOuter, INullWriter[] nullWriters1)
+ throws HyracksDataException {
this.accessorInner = accessor1;
this.accessorOuter = accessor0;
this.appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -56,6 +62,19 @@
this.memSize = memSize;
this.ctx = ctx;
+ this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter) {
+ int innerFieldCount = accessorInner.getFieldCount();
+ nullTupleBuilder = new ArrayTupleBuilder(innerFieldCount);
+ DataOutput out = nullTupleBuilder.getDataOutput();
+ for (int i = 0; i < innerFieldCount; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuilder = null;
+ }
+
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
this.getClass().getSimpleName() + this.toString());
runFileWriter = new RunFileWriter(file, ctx.getIOManager());
@@ -108,9 +127,11 @@
int tupleCount1 = accessorInner.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
+ boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
int c = compare(accessorOuter, i, accessorInner, j);
if (c == 0) {
+ matchFound = true;
if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
@@ -120,6 +141,18 @@
}
}
}
+
+ if (!matchFound && isLeftOuter) {
+ if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
+ nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
+ nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index a699703..0be01c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -25,6 +25,8 @@
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -47,13 +49,18 @@
private static final long serialVersionUID = 1L;
private final ITuplePairComparatorFactory comparatorFactory;
private final int memSize;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize) {
+ ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
+ boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
super(spec, 2, 1);
this.comparatorFactory = comparatorFactory;
this.recordDescriptors[0] = recordDescriptor;
this.memSize = memSize;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
}
@Override
@@ -111,6 +118,13 @@
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private JoinCacheTaskState state;
@@ -118,8 +132,11 @@
public void open() throws HyracksDataException {
state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
+
state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
- new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, isLeftOuter,
+ nullWriters1);
+
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 4842ad7..19479df 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -607,7 +607,7 @@
throws HyracksDataException {
NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize);
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null);
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 61d4696..622942b 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -52,30 +52,9 @@
import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
- private static class NoopNullWriterFactory implements INullWriterFactory {
-
- private static final long serialVersionUID = 1L;
- public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
-
- private NoopNullWriterFactory() {
- }
-
- @Override
- public INullWriter createNullWriter() {
- return new INullWriter() {
- @Override
- public void writeNull(DataOutput out) throws HyracksDataException {
- try {
- out.writeShort(0);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
- }
/*
* TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 1e60372..9233e39 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -15,9 +15,7 @@
package edu.uci.ics.hyracks.tests.integration;
import java.io.File;
-
import org.junit.Test;
-
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -45,6 +44,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
private static class JoinComparatorFactory implements ITuplePairComparatorFactory {
@@ -165,7 +165,8 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4, false,
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -239,7 +240,8 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, false,
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -313,7 +315,8 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6, false,
+ null);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -334,4 +337,84 @@
runTest(spec);
}
+ @Test
+ public void customerOrderCIDOuterJoinMulti() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/NoopNullWriterFactory.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/NoopNullWriterFactory.java
new file mode 100644
index 0000000..d119509
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/NoopNullWriterFactory.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hyracks.tests.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NoopNullWriterFactory implements INullWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+ private NoopNullWriterFactory() {
+ }
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ try {
+ out.writeShort(0);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 01ccdef..0ad0ff0 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -199,7 +199,7 @@
if ("nestedloop".equalsIgnoreCase(algo)) {
join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize, false, null);
} else if ("gracehash".equalsIgnoreCase(algo)) {
join = new GraceHashJoinOperatorDescriptor(