New classes added for testing external sort with replacement selection
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@817 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
new file mode 100644
index 0000000..939935f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -0,0 +1,89 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class LimitOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final int outputLimit;
+
+ public LimitOperatorDescriptor(JobSpecification spec,
+ RecordDescriptor rDesc, int outputLimit) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ this.outputLimit = outputLimit;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx, IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
+
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private FrameTupleAccessor fta;
+ private int currentSize;
+ private boolean finished;
+
+ @Override
+ public void open() throws HyracksDataException {
+ fta = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptors[0]);
+ currentSize = 0;
+ finished = false;
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ if (!finished) {
+ fta.reset(buffer);
+ int count = fta.getTupleCount();
+ if ((currentSize + count) > outputLimit) {
+ ByteBuffer b = ctx.allocateFrame();
+ FrameTupleAppender partialAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ partialAppender.reset(b, true);
+ int copyCount = outputLimit - currentSize;
+ for (int i = 0; i < copyCount; i++) {
+ partialAppender.append(fta, i);
+ currentSize++;
+ }
+ FrameUtils.makeReadable(b);
+ FrameUtils.flushFrame(b, writer);
+ finished = true;
+ } else {
+ FrameUtils.flushFrame(buffer, writer);
+ currentSize += count;
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+ };
+ }
+
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
new file mode 100644
index 0000000..c7aa1a1
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.PrintStream;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
+
+
+//By Pouria
+
+public class OptimizedSortMergeTest extends AbstractIntegrationTest {
+
+ @Test
+ public void optimizedSortMergeTest01() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4, new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ new int[] { 1, 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }), sorter, 0, printer, 0);
+
+ runTest(spec);
+ }
+
+ @Test
+ public void optimizedSortMergeTest02() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ int outputLimit = 15;
+ OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, filter, NC1_ID);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ new int[] { 1, 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }), new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE }), sorter, 0, filter, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
+
+ runTest(spec);
+ }
+}
\ No newline at end of file