VariableSizeFrame(VSizeFrame) support for Hyracks.
This patch replaced Frame/Accessor/Appender with the new API which
supports BigObject.
The ExternalSorter/TopKSorter/ExternalGroupSorter
have been implemented to support big object.
The Groupby && Join should work with BigObject also. But it will break the
memory budget when it encounter a big object. I will fix the memory
problem later in a separate CR.
The design about the frame allocation is
here:https://docs.google.com/presentation/d/15h9iQf5OYsgGZoQTbGHkj1yS2G9q2fd0s1lDAD1EJq0/edit?usp=sharing
Suggest review order:
Patch 12: It includes all of the sorting operators.
Patch 13: It applys the new IFrame API to all Hyracks codes.
Patch 14: Some bug fixes to pass all Asterix's tests.
Patch 15: Skip it!
Patch 16: Some bug fixes to the Asterix's tests in small frame setting.
Later Patch: address the comments
Change-Id: I2e08692078683f6f2cf17387e39037ad851fc05b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/234
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/.gitignore b/hyracks/hyracks-examples/hyracks-integration-tests/.gitignore
new file mode 100644
index 0000000..be303ea
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/.gitignore
@@ -0,0 +1,3 @@
+primary*/
+secondary*/
+inv*/
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 29be04b..5268150 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -24,8 +24,10 @@
import org.junit.Test;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
@@ -48,12 +50,12 @@
private final IHyracksTaskContext ctx;
private static final int FRAME_SIZE = 32768;
private RecordDescriptor rDes;
- private List<ByteBuffer> buffers;
+ private List<IFrame> buffers;
public SerDeserRunner(RecordDescriptor rDes) throws HyracksException {
ctx = TestUtils.create(FRAME_SIZE);
this.rDes = rDes;
- buffers = new ArrayList<ByteBuffer>();
+ buffers = new ArrayList<>();
}
public IOpenableDataWriter<Object[]> createWriter() throws HyracksDataException {
@@ -64,8 +66,8 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer toBuf = ctx.allocateFrame();
- toBuf.put(buffer);
+ IFrame toBuf = new VSizeFrame(ctx);
+ toBuf.getBuffer().put(buffer);
buffers.add(toBuf);
}
@@ -89,12 +91,12 @@
}
@Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public boolean nextFrame(IFrame frame) throws HyracksDataException {
if (i < buffers.size()) {
- ByteBuffer buf = buffers.get(i);
- buf.flip();
- buffer.put(buf);
- buffer.flip();
+ IFrame buf = buffers.get(i);
+ buf.getBuffer().flip();
+ frame.getBuffer().put(buf.getBuffer());
+ frame.getBuffer().flip();
++i;
return true;
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 1150cf3..30f89e2 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -15,10 +15,11 @@
package edu.uci.ics.hyracks.tests.integration;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -34,6 +35,7 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -45,6 +47,7 @@
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -138,12 +141,11 @@
hcc.waitForCompletion(jobId);
}
+
protected List<String> readResults(JobSpecification spec, JobId jobId, ResultSetId resultSetId) throws Exception {
int nReaders = 1;
- ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
- resultBuffer.clear();
- IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId);
@@ -151,16 +153,18 @@
List<String> resultRecords = new ArrayList<String>();
ByteBufferInputStream bbis = new ByteBufferInputStream();
- int readSize = reader.read(resultBuffer);
+ FrameManager resultDisplayFrameMgr = new FrameManager(spec.getFrameSize());
+ VSizeFrame frame = new VSizeFrame(resultDisplayFrameMgr);
+ int readSize = reader.read(frame);
while (readSize > 0) {
try {
- frameTupleAccessor.reset(resultBuffer);
+ frameTupleAccessor.reset(frame.getBuffer());
for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
int start = frameTupleAccessor.getTupleStartOffset(tIndex);
int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
- bbis.setByteBuffer(resultBuffer, start);
+ bbis.setByteBuffer(frame.getBuffer(), start);
byte[] recordBytes = new byte[length];
bbis.read(recordBytes, 0, length);
resultRecords.add(new String(recordBytes, 0, length));
@@ -169,8 +173,7 @@
bbis.close();
}
- resultBuffer.clear();
- readSize = reader.read(resultBuffer);
+ readSize = reader.read(frame);
}
return resultRecords;
}
@@ -198,6 +201,22 @@
return true;
}
+ protected void runTestAndStoreResult(JobSpecification spec, File file) throws Exception {
+ JobId jobId = executeTest(spec);
+
+ BufferedWriter output = new BufferedWriter(new FileWriter(file));
+ List<String> results;
+ for (int i = 0; i < spec.getResultSetIds().size(); i++) {
+ results = readResults(spec, jobId, spec.getResultSetIds().get(i));
+ for(String str : results) {
+ output.write(str);
+ }
+ }
+ output.close();
+
+ hcc.waitForCompletion(jobId);
+ }
+
protected File createTempFile() throws IOException {
File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 970f2fe..602e193 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -16,7 +16,6 @@
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -33,6 +32,7 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -44,6 +44,7 @@
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
@@ -66,7 +67,8 @@
public TemporaryFolder outputFolder = new TemporaryFolder();
public AbstractMultiNCIntegrationTest() {
- outputFiles = new ArrayList<File>();;
+ outputFiles = new ArrayList<File>();
+ ;
}
@BeforeClass
@@ -124,10 +126,10 @@
int nReaders = 1;
- ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
- resultBuffer.clear();
+ FrameManager resultDisplayFrameMgr = new FrameManager(spec.getFrameSize());
+ VSizeFrame resultFrame = new VSizeFrame(resultDisplayFrameMgr);
- IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
@@ -135,16 +137,16 @@
JSONArray resultRecords = new JSONArray();
ByteBufferInputStream bbis = new ByteBufferInputStream();
- int readSize = reader.read(resultBuffer);
+ int readSize = reader.read(resultFrame);
while (readSize > 0) {
try {
- frameTupleAccessor.reset(resultBuffer);
+ frameTupleAccessor.reset(resultFrame.getBuffer());
for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
int start = frameTupleAccessor.getTupleStartOffset(tIndex);
int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
- bbis.setByteBuffer(resultBuffer, start);
+ bbis.setByteBuffer(resultFrame.getBuffer(), start);
byte[] recordBytes = new byte[length];
bbis.read(recordBytes, 0, length);
resultRecords.put(new String(recordBytes, 0, length));
@@ -157,8 +159,7 @@
}
}
- resultBuffer.clear();
- readSize = reader.read(resultBuffer);
+ readSize = reader.read(resultFrame);
}
hcc.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index a2ef99a..9b77ec5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -42,9 +42,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
public class OptimizedSortMergeTest extends AbstractIntegrationTest {
@@ -72,17 +73,22 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
- OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4,
- new int[] { 1, 0 }, new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+ int outputLimit = 5; // larger than the total record numbers.
+ TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
+ outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+ File file = File.createTempFile(getClass().getName(), ".tmp");
+ IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, file.getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -98,6 +104,7 @@
new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
runTest(spec);
+ System.out.println("Result write into :" + file.getAbsolutePath());
}
@Test
@@ -123,11 +130,11 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
- int outputLimit = 200;
- OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4,
- outputLimit, new int[] { 1, 0 }, new IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+ int outputLimit = 20;
+ TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
+ outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/VSizeFrameSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/VSizeFrameSortMergeTest.java
new file mode 100644
index 0000000..212a99c
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/VSizeFrameSortMergeTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+
+public class VSizeFrameSortMergeTest extends AbstractIntegrationTest {
+
+ public static String[] INPUTS = { "data/tpch0.001/orders-part1.tbl", "data/tpch0.001/orders-part2.tbl" };
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File(INPUTS[0]))),
+ new FileSplit(NC2_ID, new FileReference(new File(INPUTS[1]))) };
+ IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ @Test
+ public void sortNormalMergeTest() throws Exception {
+ sortTask(1024, 4);
+ sortTask(256, 4);
+ }
+
+ @Test
+ public void sortLargeMergeTest() throws Exception {
+ sortTask(32, 128);
+ sortTask(16, 256);
+ sortTask(16, 10240);
+ }
+
+ public void sortTask(int frameSize, int frameLimit) throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID );
+
+ spec.setFrameSize(frameSize);
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ File file = File.createTempFile(getClass().getName(), ".tmp");
+
+ IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, file.getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+ spec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
+ 1, 0 }, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ System.out.println("Result write into :" + file.getAbsolutePath());
+ }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/AbstractRunGeneratorTest.java
new file mode 100644
index 0000000..c4faa1a
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+import edu.uci.ics.hyracks.test.support.TestUtils;
+
+public abstract class AbstractRunGeneratorTest {
+ static TestUtils testUtils = new TestUtils();
+ static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+ static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
+ static Random GRandom = new Random(System.currentTimeMillis());
+ static int[] SortFields = new int[] { 0, 1 };
+ static IBinaryComparatorFactory[] ComparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+ static void assertMaxFrameSizesAreAllEqualsTo(List<RunAndMaxFrameSizePair> maxSize, int pageSize) {
+ for (int i = 0; i < maxSize.size(); i++) {
+ assertTrue(maxSize.get(i).maxFrameSize == pageSize);
+ }
+ }
+
+ abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ throws HyracksDataException;
+
+ protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
+ int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+ IHyracksTaskContext ctx = testUtils.create(pageSize);
+
+ HashMap<Integer, String> keyValuePair = new HashMap<>();
+ List<IFrame> frameList = new ArrayList<>();
+ prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize,
+ specialData, keyValuePair);
+ AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
+ runGenerator.open();
+ for (IFrame frame : frameList) {
+ runGenerator.nextFrame(frame.getBuffer());
+ }
+ runGenerator.close();
+ matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+ return runGenerator.getRuns();
+ }
+
+ static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
+ Map<Integer, String> keyValuePair) throws HyracksDataException {
+ IFrame frame = new VSizeFrame(ctx);
+ FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
+
+ HashMap<Integer, String> copyMap = new HashMap<>(keyValuePair);
+ assertReadSorted(runs, fta, frame, copyMap);
+
+ HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
+ int maxFrameSizes = 0;
+ for (RunAndMaxFrameSizePair run : runs) {
+ maxFrameSizes = Math.max(maxFrameSizes, run.maxFrameSize);
+ }
+ GroupVSizeFrame gframe = new GroupVSizeFrame(ctx, maxFrameSizes);
+ GroupFrameAccessor gfta = new GroupFrameAccessor(ctx.getInitialFrameSize(), RecordDesc);
+ assertReadSorted(runs, gfta, gframe, copyMap2);
+ }
+
+ static int assertFTADataIsSorted(IFrameTupleAccessor fta, Map<Integer, String> keyValuePair, int preKey)
+ throws HyracksDataException {
+
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+ for (int i = 0; i < fta.getTupleCount(); i++) {
+ bbis.setByteBuffer(fta.getBuffer(),
+ fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 0) + fta.getFieldSlotsLength());
+ int key = (int) RecordDesc.getFields()[0].deserialize(di);
+ bbis.setByteBuffer(fta.getBuffer(),
+ fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
+ String value = (String) RecordDesc.getFields()[1].deserialize(di);
+
+ if (!keyValuePair.get(key).equals(value)) {
+ assertTrue(false);
+ }
+ keyValuePair.remove(key);
+ assertTrue(key >= preKey);
+ preKey = key;
+ }
+ return preKey;
+ }
+
+ static void assertReadSorted(List<RunAndMaxFrameSizePair> runs, IFrameTupleAccessor fta, IFrame frame,
+ Map<Integer, String> keyValuePair) throws HyracksDataException {
+
+ assertTrue(runs.size() > 0);
+ for (RunAndMaxFrameSizePair run : runs) {
+ run.run.open();
+ int preKey = Integer.MIN_VALUE;
+ while (run.run.nextFrame(frame)) {
+ fta.reset(frame.getBuffer());
+ preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+ }
+ run.run.close();
+ }
+ assertTrue(keyValuePair.isEmpty());
+ }
+
+ static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
+ int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
+ throws HyracksDataException {
+
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+ FrameTupleAppender appender = new FrameTupleAppender();
+
+ int datasize = 0;
+ if (specialData != null) {
+ for (Map.Entry<Integer, String> entry : specialData.entrySet()) {
+ tb.reset();
+ tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
+ tb.addField(UTF8StringSerializerDeserializer.INSTANCE, entry.getValue());
+
+ VSizeFrame frame = new VSizeFrame(ctx, FrameHelper
+ .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+ appender.reset(frame, true);
+ assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
+ frameList.add(frame);
+ datasize += frame.getFrameSize();
+ }
+ keyValuePair.putAll(specialData);
+ }
+
+ VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+ appender.reset(frame, true);
+ while (datasize < minDataSize) {
+ tb.reset();
+ int key = GRandom.nextInt(minDataSize + 1);
+ if (!keyValuePair.containsKey(key)) {
+ String value = generateRandomRecord(minRecordSize, maxRecordSize);
+ tb.addField(IntegerSerializerDeserializer.INSTANCE, key);
+ tb.addField(UTF8StringSerializerDeserializer.INSTANCE, value);
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ frameList.add(frame);
+ datasize += frame.getFrameSize();
+ frame = new VSizeFrame(ctx, FrameHelper
+ .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(),
+ ctx.getInitialFrameSize()));
+ appender.reset(frame, true);
+ assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
+ }
+
+ keyValuePair.put(key, value);
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ frameList.add(frame);
+ }
+
+ }
+
+ static String generateRandomRecord(int minRecordSize, int maxRecordSize)
+ throws HyracksDataException {
+ int size = GRandom.nextInt(maxRecordSize - minRecordSize + 1) + minRecordSize;
+ return generateRandomFixSizedString(size);
+
+ }
+
+ static String generateRandomFixSizedString(int size) {
+ StringBuilder sb = new StringBuilder(size);
+ for (; size >= 0; --size) {
+ char ch = (char) (GRandom.nextInt(26) + 97);
+ sb.append(ch);
+ }
+ return sb.toString();
+ }
+
+ static HashMap<Integer, String> generateBigObject(int pageSize, int times) {
+ HashMap<Integer, String> map = new HashMap<>(1);
+ for (int i = 1; i < times; i++) {
+ map.put(GRandom.nextInt(), generateRandomFixSizedString(pageSize * i));
+ }
+ return map;
+ }
+
+ @Test
+ public void testAllSmallRecords() throws HyracksDataException {
+ int pageSize = 512;
+ int frameLimit = 4;
+ int numRuns = 2;
+ int minRecordSize = pageSize / 8;
+ int maxRecordSize = pageSize / 8;
+ List<RunAndMaxFrameSizePair> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+ maxRecordSize, null);
+ assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+ }
+
+ @Test
+ public void testAllLargeRecords() throws HyracksDataException {
+ int pageSize = 2048;
+ int frameLimit = 4;
+ int numRuns = 2;
+ int minRecordSize = pageSize;
+ int maxRecordSize = (int) (pageSize * 1.8);
+ List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ null);
+ assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
+ }
+
+ @Test
+ public void testMixedLargeRecords() throws HyracksDataException {
+ int pageSize = 128;
+ int frameLimit = 4;
+ int numRuns = 4;
+ int minRecordSize = 20;
+ int maxRecordSize = pageSize / 2;
+ HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
+ List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ specialPair);
+
+ int max = 0;
+ for (RunAndMaxFrameSizePair run : size) {
+ max = Math.max(max, run.maxFrameSize);
+ }
+ assertTrue(max == pageSize * (frameLimit - 1));
+ }
+
+ @Test(expected = HyracksDataException.class)
+ public void testTooBigRecordWillThrowException() throws HyracksDataException {
+ int pageSize = 1024;
+ int frameLimit = 8;
+ int numRuns = 8;
+ HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
+ int minRecordSize = 10;
+ int maxRecordSize = pageSize / 2;
+ List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ specialPair);
+
+ }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/ExternalSortRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
new file mode 100644
index 0000000..4d7558b
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+
+public class ExternalSortRunGeneratorTest extends AbstractRunGeneratorTest {
+
+ @Override
+ AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ throws HyracksDataException {
+ return new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, RecordDesc,
+ Algorithm.MERGE_SORT, frameLimit);
+ }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HeapSortRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HeapSortRunGeneratorTest.java
new file mode 100644
index 0000000..00eca70
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HeapSortRunGeneratorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+
+public class HeapSortRunGeneratorTest extends AbstractRunGeneratorTest {
+ @Override
+ AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ throws HyracksDataException {
+ return new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
+ RecordDesc);
+ }
+
+ @Test
+ public void testTopK(){
+
+ }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HybridSortRunGenerator.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HybridSortRunGenerator.java
new file mode 100644
index 0000000..f7ecd5e
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HybridSortRunGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+
+public class HybridSortRunGenerator extends AbstractRunGeneratorTest {
+ @Override
+ AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+ throws HyracksDataException {
+ return new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
+ RecordDesc);
+ }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/RunMergingFrameReaderTest.java
new file mode 100644
index 0000000..d5355b8
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -0,0 +1,409 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.ComparatorFactories;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.GRandom;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.RecordDesc;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.SortFields;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.generateRandomRecord;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.matchResult;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.prepareData;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.testUtils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+
+public class RunMergingFrameReaderTest {
+ static IBinaryComparator[] Comparators = new IBinaryComparator[] {
+ ComparatorFactories[0].createBinaryComparator(),
+ ComparatorFactories[1].createBinaryComparator(),
+ };
+
+ static class TestFrameReader implements IFrameReader {
+
+ private final int pageSize;
+ private final int numFrames;
+ private final int minRecordSize;
+ private final int maxRecordSize;
+ private TreeMap<Integer, String> result = new TreeMap<>();
+ int maxFrameSize;
+
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+ FrameTupleAppender appender = new FrameTupleAppender();
+ private Iterator<Map.Entry<Integer, String>> iterator;
+ private Map.Entry<Integer, String> lastEntry;
+
+ TestFrameReader(int pageSize, int numFrames, int minRecordSize, int maxRecordSize) {
+ this.pageSize = pageSize;
+ this.numFrames = numFrames;
+ this.minRecordSize = minRecordSize;
+ this.maxRecordSize = maxRecordSize;
+ this.maxFrameSize = pageSize;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ result.clear();
+ int maxTupleSize = prepareSortedData(numFrames * pageSize, minRecordSize, maxRecordSize, null, result);
+ maxFrameSize = FrameHelper.calcAlignedFrameSizeToStore(0, maxTupleSize, pageSize);
+ iterator = result.entrySet().iterator();
+ }
+
+ @Override
+ public boolean nextFrame(IFrame frame) throws HyracksDataException {
+ if (lastEntry == null && !iterator.hasNext()) {
+ return false;
+ }
+ if (lastEntry == null) {
+ lastEntry = iterator.next();
+ }
+ appender.reset(frame, true);
+ while (true) {
+ tb.reset();
+ tb.addField(IntegerSerializerDeserializer.INSTANCE, lastEntry.getKey());
+ tb.addField(UTF8StringSerializerDeserializer.INSTANCE, lastEntry.getValue());
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ break;
+ } else {
+ if (iterator.hasNext()) {
+ lastEntry = iterator.next();
+ } else {
+ lastEntry = null;
+ break;
+ }
+
+ }
+ }
+ // printFrame(frame.getBuffer());
+ return true;
+ }
+
+ private void printFrame(ByteBuffer buffer) {
+ FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
+ fta.reset(buffer);
+ fta.prettyPrint();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+ }
+
+ static int prepareSortedData(int minDataSize, int minRecordSize, int maxRecordSize,
+ Map<Integer, String> specialData, Map<Integer, String> result) throws HyracksDataException {
+
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+
+ int datasize = 0;
+ int maxtuple = 0;
+ if (specialData != null) {
+ for (Map.Entry<Integer, String> entry : specialData.entrySet()) {
+ tb.reset();
+ tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
+ tb.addField(UTF8StringSerializerDeserializer.INSTANCE, entry.getValue());
+ int size = tb.getSize() + tb.getFieldEndOffsets().length * 4;
+ datasize += size;
+ if (size > maxtuple) {
+ maxtuple = size;
+ }
+ }
+ result.putAll(specialData);
+ }
+
+ while (datasize < minDataSize) {
+ String value = generateRandomRecord(minRecordSize, maxRecordSize);
+ tb.reset();
+ int key = GRandom.nextInt(datasize + 1);
+ if (!result.containsKey(key)) {
+ tb.addField(IntegerSerializerDeserializer.INSTANCE, key);
+ tb.addField(UTF8StringSerializerDeserializer.INSTANCE, value);
+ int size = tb.getSize() + tb.getFieldEndOffsets().length * 4;
+ datasize += size;
+ if (size > maxtuple) {
+ maxtuple = size;
+ }
+ if (datasize < minDataSize) {
+ result.put(key, value);
+ }
+ }
+ }
+
+ return maxtuple;
+ }
+
+ @Test
+ public void testOnlyOneRunShouldMerge() throws HyracksDataException {
+ int pageSize = 128;
+ int numRuns = 1;
+ int numFramesPerRun = 1;
+ int minRecordSize = pageSize / 10;
+ int maxRecordSize = pageSize / 8;
+
+ IHyracksTaskContext ctx = testUtils.create(pageSize);
+ List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
+ List<TestFrameReader> readerList = new ArrayList<>(numRuns);
+ List<IFrame> frameList = new ArrayList<>(numRuns);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+ minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+
+ RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
+ null, RecordDesc);
+ testMergeSucceed(ctx, reader, keyValueMapList);
+ }
+
+ @Test
+ public void testNormalRunMerge() throws HyracksDataException {
+
+ int pageSize = 128;
+ int numRuns = 2;
+ int numFramesPerRun = 2;
+ int minRecordSize = pageSize / 10;
+ int maxRecordSize = pageSize / 8;
+
+ IHyracksTaskContext ctx = testUtils.create(pageSize);
+ List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
+ List<TestFrameReader> readerList = new ArrayList<>(numRuns);
+ List<IFrame> frameList = new ArrayList<>(numRuns);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+ minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+
+ RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
+ null, RecordDesc);
+ testMergeSucceed(ctx, reader, keyValueMapList);
+ }
+
+ @Test
+ public void testNormalRunMergeWithTopK() throws HyracksDataException {
+
+ int pageSize = 128;
+ int numRuns = 2;
+ int numFramesPerRun = 2;
+ int minRecordSize = pageSize / 10;
+ int maxRecordSize = pageSize / 8;
+
+ for (int topK = 1; topK < pageSize * numRuns * numFramesPerRun / maxRecordSize / 2; topK++) {
+ IHyracksTaskContext ctx = testUtils.create(pageSize);
+ List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
+ List<TestFrameReader> readerList = new ArrayList<>(numRuns);
+ List<IFrame> frameList = new ArrayList<>(numRuns);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+ minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+
+ RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
+ Comparators,
+ null, RecordDesc, topK);
+ int totoalCount = testMergeSucceedInner(ctx, reader, keyValueMapList);
+ int newCount = 0;
+ for (Map<Integer, String> x : keyValueMapList) {
+ newCount += x.size();
+ }
+ assertEquals(topK + newCount, totoalCount);
+ }
+ }
+
+ private void testMergeSucceed(IHyracksTaskContext ctx, RunMergingFrameReader reader,
+ List<Map<Integer, String>> keyValueMapList) throws HyracksDataException {
+
+ testMergeSucceedInner(ctx, reader, keyValueMapList);
+ assertAllKeyValueIsConsumed(keyValueMapList);
+ reader.close();
+ }
+
+ private int testMergeSucceedInner(IHyracksTaskContext ctx, RunMergingFrameReader reader,
+ List<Map<Integer, String>> keyValueMapList) throws HyracksDataException {
+
+ IFrame frame = new VSizeFrame(ctx);
+ reader.open();
+ int count = 0;
+ for (int i = 0; i < keyValueMapList.size(); i++) {
+ keyValueMapList.set(i, new TreeMap<>(keyValueMapList.get(i)));
+ count += keyValueMapList.get(i).size();
+ }
+ while (reader.nextFrame(frame)) {
+ assertFrameIsSorted(frame, keyValueMapList);
+ }
+ return count;
+ }
+
+ @Test
+ public void testOneLargeRunMerge() throws HyracksDataException {
+ int pageSize = 64;
+ int numRuns = 2;
+ int numFramesPerRun = 1;
+ int minRecordSize = pageSize / 10;
+ int maxRecordSize = pageSize / 8;
+
+ IHyracksTaskContext ctx = testUtils.create(pageSize);
+ List<Map<Integer, String>> keyValueMap = new ArrayList<>();
+ List<TestFrameReader> readerList = new ArrayList<>();
+ List<IFrame> frameList = new ArrayList<>();
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+ minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+
+ minRecordSize = pageSize;
+ maxRecordSize = pageSize;
+ numFramesPerRun = 4;
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+ minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+
+ minRecordSize = pageSize * 2;
+ maxRecordSize = pageSize * 2;
+ numFramesPerRun = 6;
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+ minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+
+ RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
+ Comparators,
+ null,
+ RecordDesc);
+ testMergeSucceed(ctx, reader, keyValueMap);
+ }
+
+ @Test
+ public void testRunFileReader() throws HyracksDataException {
+ int pageSize = 128;
+ int numRuns = 4;
+ int numFramesPerRun = 4;
+ int minRecordSize = pageSize / 10;
+ int maxRecordSize = pageSize / 2;
+
+ IHyracksTaskContext ctx = testUtils.create(pageSize);
+ ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields,
+ null, ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT,
+ numFramesPerRun);
+
+ runGenerator.open();
+ Map<Integer, String> keyValuePair = new HashMap<>();
+ List<IFrame> frameList = new ArrayList<>();
+ prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
+ null, keyValuePair);
+ for (IFrame frame : frameList) {
+ runGenerator.nextFrame(frame.getBuffer());
+ }
+
+ numFramesPerRun = 2;
+ minRecordSize = pageSize;
+ maxRecordSize = pageSize;
+ frameList.clear();
+ prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
+ null, keyValuePair);
+ for (IFrame frame : frameList) {
+ runGenerator.nextFrame(frame.getBuffer());
+ }
+
+ runGenerator.close();
+ List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
+ for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
+ inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
+ }
+ matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+ List<IFrameReader> runs = new ArrayList<>();
+ for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
+ runs.add(run.run);
+ }
+ RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
+ RecordDesc);
+
+ IFrame outFrame = new VSizeFrame(ctx);
+ reader.open();
+ while (reader.nextFrame(outFrame)) {
+ assertFrameIsSorted(outFrame, Arrays.asList(keyValuePair));
+ }
+ reader.close();
+ assertAllKeyValueIsConsumed(Arrays.asList(keyValuePair));
+ }
+
+ private void assertAllKeyValueIsConsumed(List<Map<Integer, String>> keyValueMapList) {
+ for (Map<Integer, String> map : keyValueMapList) {
+ assertTrue(map.isEmpty());
+ }
+ }
+
+ private void assertFrameIsSorted(IFrame frame, List<Map<Integer, String>> keyValueMapList)
+ throws HyracksDataException {
+ FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
+
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ fta.reset(frame.getBuffer());
+ // fta.prettyPrint();
+ int preKey = Integer.MIN_VALUE;
+ for (int i = 0; i < fta.getTupleCount(); i++) {
+ bbis.setByteBuffer(fta.getBuffer(),
+ fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 0) + fta.getFieldSlotsLength());
+ int key = (int) RecordDesc.getFields()[0].deserialize(di);
+ bbis.setByteBuffer(fta.getBuffer(),
+ fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
+ String value = (String) RecordDesc.getFields()[1].deserialize(di);
+
+ boolean found = false;
+ for (Map<Integer, String> map : keyValueMapList) {
+ if (map.containsKey(key) && map.get(key).equals(value)) {
+ found = true;
+ map.remove(key);
+ break;
+ }
+ }
+ assertTrue(found);
+ assertTrue(preKey <= key);
+ preKey = key;
+ }
+ }
+
+ static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns,
+ int numFramesPerRun, int minRecordSize, int maxRecordSize,
+ List<TestFrameReader> readerList, List<IFrame> frameList, List<Map<Integer, String>> keyValueMap)
+ throws HyracksDataException {
+ for (int i = 0; i < numRuns; i++) {
+ readerList.add(new TestFrameReader(pageSize, numFramesPerRun, minRecordSize, maxRecordSize));
+ frameList.add(new VSizeFrame(ctx, readerList.get(readerList.size() - 1).maxFrameSize));
+ keyValueMap.add(readerList.get(readerList.size() - 1).result);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/TopKRunGeneratorTest.java
new file mode 100644
index 0000000..ae0397b
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.SerDers;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.assertFTADataIsSorted;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FixedSizeFrame;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+
+public class TopKRunGeneratorTest {
+
+ static final int PAGE_SIZE = 512;
+ static final int NUM_PAGES = 80;
+ static final int SORT_FRAME_LIMIT = 4;
+
+ enum ORDER {
+ INORDER,
+ REVERSE
+ }
+
+ public class InMemorySortDataValidator implements IFrameWriter {
+
+ InMemorySortDataValidator(Map<Integer, String> answer) {
+ this.answer = answer;
+ }
+
+ Map<Integer, String> answer;
+ FrameTupleAccessor accessor;
+ int preKey = Integer.MIN_VALUE;
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(RecordDesc);
+ preKey = Integer.MIN_VALUE;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ preKey = assertFTADataIsSorted(accessor, answer, preKey);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ assertTrue(answer.isEmpty());
+ }
+ }
+
+ @Test
+ public void testReverseOrderedDataShouldNotGenerateAnyRuns() throws HyracksDataException {
+ int topK = 1;
+ IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+ HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+ SortFields, null, ComparatorFactories, RecordDesc);
+
+ testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+ }
+
+ @Test
+ public void testAlreadySortedDataShouldNotGenerateAnyRuns() throws HyracksDataException {
+ int topK = SORT_FRAME_LIMIT;
+ IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+ HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+ SortFields, null, ComparatorFactories, RecordDesc);
+
+ testInMemoryOnly(ctx, topK, ORDER.INORDER, sorter);
+ }
+
+ @Test
+ public void testHybridTopKShouldNotGenerateAnyRuns() throws HyracksDataException {
+ int topK = 1;
+ IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+ AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+ SortFields, null, ComparatorFactories, RecordDesc);
+
+ testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+ }
+
+ @Test
+ public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
+ int topK = 1;
+ IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+ AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+ SortFields, null, ComparatorFactories, RecordDesc);
+
+ }
+
+ private void testInMemoryOnly(IHyracksTaskContext ctx, int topK, ORDER order, AbstractSortRunGenerator sorter)
+ throws HyracksDataException {
+ Map<Integer, String> keyValuePair = null;
+ switch (order) {
+ case INORDER:
+ keyValuePair = new TreeMap<>();
+ break;
+ case REVERSE:
+ keyValuePair = new TreeMap<>(Collections.reverseOrder());
+ break;
+ }
+
+ List<IFrame> frameList = new ArrayList<>();
+ int minDataSize = PAGE_SIZE * NUM_PAGES * 4 / 5;
+ int minRecordSize = 16;
+ int maxRecordSize = 64;
+
+ AbstractRunGeneratorTest
+ .prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null, keyValuePair);
+
+ assert topK > 0;
+
+ ByteBuffer buffer = prepareSortedData(keyValuePair);
+
+ Map<Integer, String> topKAnswer = getTopKAnswer(keyValuePair, topK);
+
+ doSort(sorter, buffer);
+
+ assertEquals(0, sorter.getRuns().size());
+ validateResult(sorter, topKAnswer);
+ }
+
+ private void validateResult(AbstractSortRunGenerator sorter, Map<Integer, String> topKAnswer)
+ throws HyracksDataException {
+
+ InMemorySortDataValidator validator = new InMemorySortDataValidator(topKAnswer);
+ validator.open();
+ sorter.getSorter().flush(validator);
+ validator.close();
+ }
+
+ private void doSort(AbstractSortRunGenerator sorter, ByteBuffer buffer) throws HyracksDataException {
+
+ sorter.open();
+ sorter.nextFrame(buffer);
+ sorter.close();
+ }
+
+ private Map<Integer, String> getTopKAnswer(Map<Integer, String> keyValuePair, int topK) {
+
+ TreeMap<Integer, String> copy = new TreeMap<>(keyValuePair);
+
+ Map<Integer, String> answer = new TreeMap<>();
+ for (Map.Entry<Integer, String> entry : copy.entrySet()) {
+ if (answer.size() < topK) {
+ answer.put(entry.getKey(), entry.getValue());
+ } else {
+ break;
+ }
+ }
+ return answer;
+ }
+
+ private ByteBuffer prepareSortedData(Map<Integer, String> keyValuePair) throws HyracksDataException {
+ ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE * NUM_PAGES);
+ IFrame inputFrame = new FixedSizeFrame(buffer);
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(inputFrame, true);
+ ArrayTupleBuilder builder = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+
+ for (Map.Entry<Integer, String> entry : keyValuePair.entrySet()) {
+ builder.reset();
+ builder.addField(SerDers[0], entry.getKey());
+ builder.addField(SerDers[1], entry.getValue());
+ appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize());
+ }
+ return buffer;
+ }
+}