New classes added for testing external sort with replacement selection

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@817 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
new file mode 100644
index 0000000..939935f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -0,0 +1,89 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class LimitOperatorDescriptor extends
+		AbstractSingleActivityOperatorDescriptor {
+	private static final long serialVersionUID = 1L;
+	private final int outputLimit;
+
+	public LimitOperatorDescriptor(JobSpecification spec,
+			RecordDescriptor rDesc, int outputLimit) {
+		super(spec, 1, 1);
+		recordDescriptors[0] = rDesc;
+		this.outputLimit = outputLimit;
+	}
+
+	@Override
+	public IOperatorNodePushable createPushRuntime(
+			final IHyracksTaskContext ctx, IOperatorEnvironment env,
+			final IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) throws HyracksDataException {
+
+		return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+			private FrameTupleAccessor fta;
+			private int currentSize;
+			private boolean finished;
+
+			@Override
+			public void open() throws HyracksDataException {
+				fta = new FrameTupleAccessor(ctx.getFrameSize(),
+						recordDescriptors[0]);
+				currentSize = 0;
+				finished = false;
+				writer.open();
+			}
+
+			@Override
+			public void nextFrame(ByteBuffer buffer)
+					throws HyracksDataException {
+				if (!finished) {
+					fta.reset(buffer);
+					int count = fta.getTupleCount();
+					if ((currentSize + count) > outputLimit) {
+						ByteBuffer b = ctx.allocateFrame();
+						FrameTupleAppender partialAppender = new FrameTupleAppender(
+								ctx.getFrameSize());
+						partialAppender.reset(b, true);
+						int copyCount = outputLimit - currentSize;
+						for (int i = 0; i < copyCount; i++) {
+							partialAppender.append(fta, i);
+							currentSize++;
+						}
+						FrameUtils.makeReadable(b);
+						FrameUtils.flushFrame(b, writer);
+						finished = true;
+					} else {
+						FrameUtils.flushFrame(buffer, writer);
+						currentSize += count;
+					}
+				}
+			}
+
+			@Override
+			public void fail() throws HyracksDataException {
+				writer.fail();
+
+			}
+
+			@Override
+			public void close() throws HyracksDataException {
+				writer.close();
+			}
+		};
+	}
+
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
new file mode 100644
index 0000000..c7aa1a1
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.PrintStream;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
+
+
+//By Pouria
+
+public class OptimizedSortMergeTest extends AbstractIntegrationTest {
+	
+	@Test
+    public void optimizedSortMergeTest01() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4, new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 1, 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE }), new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE }), sorter, 0, printer, 0);
+
+        runTest(spec);
+    }
+	
+	@Test
+    public void optimizedSortMergeTest02() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+        
+        int outputLimit = 15;
+        OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+        
+        LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, filter, NC1_ID);
+        
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 1, 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                        UTF8StringBinaryHashFunctionFactory.INSTANCE }), new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE }), sorter, 0, filter, 0);
+        
+        spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
+
+        runTest(spec);
+    }
+}
\ No newline at end of file