Added UnionAll Operator and test

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@418 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 4a1c78a..293e626 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -251,13 +251,11 @@
                 for (int i : tasks.get(hanId)) {
                     IOperatorNodePushable hon = han.createPushRuntime(stagelet, joblet.getEnvironment(op, i), rdp, i,
                             opNumPartitions.get(op.getOperatorId()));
-                    OperatorRunnable or = new OperatorRunnable(stagelet, hon);
+                    OperatorRunnable or = new OperatorRunnable(stagelet, hon, inputs == null ? 0 : inputs.size(),
+                            executor);
                     stagelet.setOperator(op.getOperatorId(), i, or);
                     if (inputs != null) {
                         for (int j = 0; j < inputs.size(); ++j) {
-                            if (j >= 1) {
-                                throw new IllegalStateException();
-                            }
                             IConnectorDescriptor conn = inputs.get(j);
                             OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
                                     .getOperatorId();
@@ -276,7 +274,7 @@
                             portMap.put(piId, endpoint);
                             IFrameReader reader = createReader(stagelet, conn, drlf, i, plan, stagelet,
                                     opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
-                            or.setFrameReader(reader);
+                            or.setFrameReader(j, reader);
                         }
                     }
                     honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index eca7fd0..8edd992 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.control.nc.runtime;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,22 +26,27 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class OperatorRunnable implements Runnable {
+    private final IHyracksStageletContext ctx;
     private final IOperatorNodePushable opNode;
-    private IFrameReader reader;
-    private ByteBuffer buffer;
+    private final int nInputs;
+    private final Executor executor;
+    private IFrameReader[] readers;
     private volatile boolean abort;
 
-    public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode) {
+    public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
+        this.ctx = ctx;
         this.opNode = opNode;
-        buffer = ctx.allocateFrame();
+        this.nInputs = nInputs;
+        this.executor = executor;
+        readers = new IFrameReader[nInputs];
     }
 
     public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         opNode.setOutputFrameWriter(index, writer, recordDesc);
     }
 
-    public void setFrameReader(IFrameReader reader) {
-        this.reader = reader;
+    public void setFrameReader(int inputIdx, IFrameReader reader) {
+        this.readers[inputIdx] = reader;
     }
 
     public void abort() {
@@ -50,20 +57,28 @@
     public void run() {
         try {
             opNode.initialize();
-            if (reader != null) {
-                IFrameWriter writer = opNode.getInputFrameWriter(0);
-                writer.open();
-                reader.open();
-                while (readFrame()) {
-                    if (abort) {
-                        break;
-                    }
-                    buffer.flip();
-                    writer.nextFrame(buffer);
-                    buffer.compact();
+            if (nInputs > 0) {
+                final Semaphore sem = new Semaphore(nInputs - 1);
+                for (int i = 1; i < nInputs; ++i) {
+                    final IFrameReader reader = readers[i];
+                    final IFrameWriter writer = opNode.getInputFrameWriter(i);
+                    sem.acquire();
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                pushFrames(reader, writer);
+                            } catch (HyracksDataException e) {
+                            } finally {
+                                sem.release();
+                            }
+                        }
+                    });
                 }
-                reader.close();
-                writer.close();
+                try {
+                    pushFrames(readers[0], opNode.getInputFrameWriter(0));
+                } finally {
+                    sem.acquire(nInputs - 1);
+                }
             }
             opNode.deinitialize();
         } catch (Exception e) {
@@ -71,8 +86,20 @@
         }
     }
 
-    protected boolean readFrame() throws HyracksDataException {
-        return reader.nextFrame(buffer);
+    private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer buffer = ctx.allocateFrame();
+        writer.open();
+        reader.open();
+        while (reader.nextFrame(buffer)) {
+            if (abort) {
+                break;
+            }
+            buffer.flip();
+            writer.nextFrame(buffer);
+            buffer.compact();
+        }
+        reader.close();
+        writer.close();
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..eccb945
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+    public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+        super(spec, nInputs, 1);
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        UnionActivityNode uba = new UnionActivityNode();
+        builder.addTask(uba);
+        for (int i = 0; i < inputArity; ++i) {
+            builder.addSourceEdge(i, uba, i);
+        }
+        builder.addTargetEdge(0, uba, 0);
+    }
+
+    private class UnionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public UnionActivityNode() {
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                throws HyracksDataException {
+            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            return new UnionOperator(ctx, inRecordDesc);
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return UnionAllOperatorDescriptor.this;
+        }
+
+    }
+
+    private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+        private int nOpened;
+
+        private int nClosed;
+
+        public UnionOperator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc) {
+            nOpened = 0;
+            nClosed = 0;
+        }
+
+        @Override
+        public int getInputArity() {
+            return inputArity;
+        }
+
+        @Override
+        public IFrameWriter getInputFrameWriter(int index) {
+            return new IFrameWriter() {
+                @Override
+                public void open() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nOpened == 1) {
+                            writer.open();
+                        }
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.nextFrame(buffer);
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.flush();
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nClosed == inputArity) {
+                            writer.close();
+                        }
+                    }
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
new file mode 100644
index 0000000..b1089fe
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionTest extends AbstractIntegrationTest {
+    @Test
+    public void union01() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+                new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))),
+                new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
+
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor csvScanner01 = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner01, NC2_ID, NC1_ID);
+
+        FileScanOperatorDescriptor csvScanner02 = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner02, NC2_ID, NC1_ID);
+
+        UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner02, 0, unionAll, 1);
+        spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+}
\ No newline at end of file