Added UnionAll Operator and test
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@418 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 4a1c78a..293e626 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -251,13 +251,11 @@
for (int i : tasks.get(hanId)) {
IOperatorNodePushable hon = han.createPushRuntime(stagelet, joblet.getEnvironment(op, i), rdp, i,
opNumPartitions.get(op.getOperatorId()));
- OperatorRunnable or = new OperatorRunnable(stagelet, hon);
+ OperatorRunnable or = new OperatorRunnable(stagelet, hon, inputs == null ? 0 : inputs.size(),
+ executor);
stagelet.setOperator(op.getOperatorId(), i, or);
if (inputs != null) {
for (int j = 0; j < inputs.size(); ++j) {
- if (j >= 1) {
- throw new IllegalStateException();
- }
IConnectorDescriptor conn = inputs.get(j);
OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
.getOperatorId();
@@ -276,7 +274,7 @@
portMap.put(piId, endpoint);
IFrameReader reader = createReader(stagelet, conn, drlf, i, plan, stagelet,
opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
- or.setFrameReader(reader);
+ or.setFrameReader(j, reader);
}
}
honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index eca7fd0..8edd992 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.control.nc.runtime;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,22 +26,27 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class OperatorRunnable implements Runnable {
+ private final IHyracksStageletContext ctx;
private final IOperatorNodePushable opNode;
- private IFrameReader reader;
- private ByteBuffer buffer;
+ private final int nInputs;
+ private final Executor executor;
+ private IFrameReader[] readers;
private volatile boolean abort;
- public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode) {
+ public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
+ this.ctx = ctx;
this.opNode = opNode;
- buffer = ctx.allocateFrame();
+ this.nInputs = nInputs;
+ this.executor = executor;
+ readers = new IFrameReader[nInputs];
}
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
opNode.setOutputFrameWriter(index, writer, recordDesc);
}
- public void setFrameReader(IFrameReader reader) {
- this.reader = reader;
+ public void setFrameReader(int inputIdx, IFrameReader reader) {
+ this.readers[inputIdx] = reader;
}
public void abort() {
@@ -50,20 +57,28 @@
public void run() {
try {
opNode.initialize();
- if (reader != null) {
- IFrameWriter writer = opNode.getInputFrameWriter(0);
- writer.open();
- reader.open();
- while (readFrame()) {
- if (abort) {
- break;
- }
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ if (nInputs > 0) {
+ final Semaphore sem = new Semaphore(nInputs - 1);
+ for (int i = 1; i < nInputs; ++i) {
+ final IFrameReader reader = readers[i];
+ final IFrameWriter writer = opNode.getInputFrameWriter(i);
+ sem.acquire();
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ pushFrames(reader, writer);
+ } catch (HyracksDataException e) {
+ } finally {
+ sem.release();
+ }
+ }
+ });
}
- reader.close();
- writer.close();
+ try {
+ pushFrames(readers[0], opNode.getInputFrameWriter(0));
+ } finally {
+ sem.acquire(nInputs - 1);
+ }
}
opNode.deinitialize();
} catch (Exception e) {
@@ -71,8 +86,20 @@
}
}
- protected boolean readFrame() throws HyracksDataException {
- return reader.nextFrame(buffer);
+ private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ writer.open();
+ reader.open();
+ while (reader.nextFrame(buffer)) {
+ if (abort) {
+ break;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
+ }
+ reader.close();
+ writer.close();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..eccb945
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+ public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+ super(spec, nInputs, 1);
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ UnionActivityNode uba = new UnionActivityNode();
+ builder.addTask(uba);
+ for (int i = 0; i < inputArity; ++i) {
+ builder.addSourceEdge(i, uba, i);
+ }
+ builder.addTargetEdge(0, uba, 0);
+ }
+
+ private class UnionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public UnionActivityNode() {
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ return new UnionOperator(ctx, inRecordDesc);
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return UnionAllOperatorDescriptor.this;
+ }
+
+ }
+
+ private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+ private int nOpened;
+
+ private int nClosed;
+
+ public UnionOperator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc) {
+ nOpened = 0;
+ nClosed = 0;
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nOpened == 1) {
+ writer.open();
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.nextFrame(buffer);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.flush();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nClosed == inputArity) {
+ writer.close();
+ }
+ }
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
new file mode 100644
index 0000000..b1089fe
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionTest extends AbstractIntegrationTest {
+ @Test
+ public void union01() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))),
+ new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
+
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor csvScanner01 = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner01, NC2_ID, NC1_ID);
+
+ FileScanOperatorDescriptor csvScanner02 = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner02, NC2_ID, NC1_ID);
+
+ UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner02, 0, unionAll, 1);
+ spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+}
\ No newline at end of file