Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution branch.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2862 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
index 82919ca..ba296ac 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -44,6 +44,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>appassembler-maven-plugin</artifactId>
+        <version>1.3</version>
         <executions>
           <execution>
             <configuration>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
index 2f0b00f..dd96db8 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
@@ -40,6 +40,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>appassembler-maven-plugin</artifactId>
+        <version>1.3</version>
         <executions>
           <execution>
             <configuration>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 61d4696..622942b 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -52,30 +52,9 @@
 import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
 
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
-    private static class NoopNullWriterFactory implements INullWriterFactory {
-
-        private static final long serialVersionUID = 1L;
-        public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
-
-        private NoopNullWriterFactory() {
-        }
-
-        @Override
-        public INullWriter createNullWriter() {
-            return new INullWriter() {
-                @Override
-                public void writeNull(DataOutput out) throws HyracksDataException {
-                    try {
-                        out.writeShort(0);
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            };
-        }
-    }
 
     /*
      * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 1e60372..9233e39 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -15,9 +15,7 @@
 package edu.uci.ics.hyracks.tests.integration;
 
 import java.io.File;
-
 import org.junit.Test;
-
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -45,6 +44,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.tests.util.NoopNullWriterFactory;
 
 public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
     private static class JoinComparatorFactory implements ITuplePairComparatorFactory {
@@ -165,7 +165,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4, false,
+                null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -239,7 +240,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, false,
+                null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -313,7 +315,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6, false,
+                null);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -334,4 +337,84 @@
         runTest(spec);
     }
 
+    @Test
+    public void customerOrderCIDOuterJoinMulti() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/NoopNullWriterFactory.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/NoopNullWriterFactory.java
new file mode 100644
index 0000000..d119509
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/util/NoopNullWriterFactory.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hyracks.tests.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NoopNullWriterFactory implements INullWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+    public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+    private NoopNullWriterFactory() {
+    }
+
+    @Override
+    public INullWriter createNullWriter() {
+        return new INullWriter() {
+            @Override
+            public void writeNull(DataOutput out) throws HyracksDataException {
+                try {
+                    out.writeShort(0);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index d593ef9..901f1fb2 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -40,6 +40,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>appassembler-maven-plugin</artifactId>
+        <version>1.3</version>
         <executions>
           <execution>
           <id>textclient</id>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index 3170306..6b3f603 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -36,6 +36,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>appassembler-maven-plugin</artifactId>
+        <version>1.3</version>
         <executions>
           <execution>
             <configuration>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 01ccdef..0ad0ff0 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -199,7 +199,7 @@
 
         if ("nestedloop".equalsIgnoreCase(algo)) {
             join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize);
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize, false, null);
 
         } else if ("gracehash".equalsIgnoreCase(algo)) {
             join = new GraceHashJoinOperatorDescriptor(