VariableSizeFrame(VSizeFrame) support for Hyracks.

This patch replaced Frame/Accessor/Appender with the new API which
supports BigObject.
The ExternalSorter/TopKSorter/ExternalGroupSorter
have been implemented to support big object.

The Groupby && Join should work with BigObject also. But it will break the
memory budget when it encounter a big object. I will fix the memory
problem later in a separate CR.

The design about the frame allocation is
here:https://docs.google.com/presentation/d/15h9iQf5OYsgGZoQTbGHkj1yS2G9q2fd0s1lDAD1EJq0/edit?usp=sharing

Suggest review order:
Patch 12: It includes all of the sorting operators.
Patch 13: It applys the new IFrame API to all Hyracks codes.
Patch 14: Some bug fixes to pass all Asterix's tests.
Patch 15: Skip it!
Patch 16: Some bug fixes to the Asterix's tests in small frame setting.
Later Patch: address the comments

Change-Id: I2e08692078683f6f2cf17387e39037ad851fc05b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/234
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/.gitignore b/hyracks/hyracks-examples/hyracks-integration-tests/.gitignore
new file mode 100644
index 0000000..be303ea
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/.gitignore
@@ -0,0 +1,3 @@
+primary*/
+secondary*/
+inv*/
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 29be04b..5268150 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -24,8 +24,10 @@
 
 import org.junit.Test;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
@@ -48,12 +50,12 @@
         private final IHyracksTaskContext ctx;
         private static final int FRAME_SIZE = 32768;
         private RecordDescriptor rDes;
-        private List<ByteBuffer> buffers;
+        private List<IFrame> buffers;
 
         public SerDeserRunner(RecordDescriptor rDes) throws HyracksException {
             ctx = TestUtils.create(FRAME_SIZE);
             this.rDes = rDes;
-            buffers = new ArrayList<ByteBuffer>();
+            buffers = new ArrayList<>();
         }
 
         public IOpenableDataWriter<Object[]> createWriter() throws HyracksDataException {
@@ -64,8 +66,8 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer toBuf = ctx.allocateFrame();
-                    toBuf.put(buffer);
+                    IFrame toBuf = new VSizeFrame(ctx);
+                    toBuf.getBuffer().put(buffer);
                     buffers.add(toBuf);
                 }
 
@@ -89,12 +91,12 @@
                 }
 
                 @Override
-                public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                public boolean nextFrame(IFrame frame) throws HyracksDataException {
                     if (i < buffers.size()) {
-                        ByteBuffer buf = buffers.get(i);
-                        buf.flip();
-                        buffer.put(buf);
-                        buffer.flip();
+                        IFrame buf = buffers.get(i);
+                        buf.getBuffer().flip();
+                        frame.getBuffer().put(buf.getBuffer());
+                        frame.getBuffer().flip();
                         ++i;
                         return true;
                     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 1150cf3..30f89e2 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -15,10 +15,11 @@
 package edu.uci.ics.hyracks.tests.integration;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -45,6 +47,7 @@
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
@@ -138,12 +141,11 @@
         hcc.waitForCompletion(jobId);
     }
 
+
     protected List<String> readResults(JobSpecification spec, JobId jobId, ResultSetId resultSetId) throws Exception {
         int nReaders = 1;
-        ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
-        resultBuffer.clear();
 
-        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
 
         IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
         IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, resultSetId);
@@ -151,16 +153,18 @@
         List<String> resultRecords = new ArrayList<String>();
         ByteBufferInputStream bbis = new ByteBufferInputStream();
 
-        int readSize = reader.read(resultBuffer);
+        FrameManager resultDisplayFrameMgr = new FrameManager(spec.getFrameSize());
+        VSizeFrame frame = new VSizeFrame(resultDisplayFrameMgr);
+        int readSize = reader.read(frame);
 
         while (readSize > 0) {
 
             try {
-                frameTupleAccessor.reset(resultBuffer);
+                frameTupleAccessor.reset(frame.getBuffer());
                 for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
                     int start = frameTupleAccessor.getTupleStartOffset(tIndex);
                     int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
-                    bbis.setByteBuffer(resultBuffer, start);
+                    bbis.setByteBuffer(frame.getBuffer(), start);
                     byte[] recordBytes = new byte[length];
                     bbis.read(recordBytes, 0, length);
                     resultRecords.add(new String(recordBytes, 0, length));
@@ -169,8 +173,7 @@
                 bbis.close();
             }
 
-            resultBuffer.clear();
-            readSize = reader.read(resultBuffer);
+            readSize = reader.read(frame);
         }
         return resultRecords;
     }
@@ -198,6 +201,22 @@
         return true;
     }
 
+    protected void runTestAndStoreResult(JobSpecification spec, File file) throws Exception {
+        JobId jobId = executeTest(spec);
+
+        BufferedWriter output = new BufferedWriter(new FileWriter(file));
+        List<String> results;
+        for (int i = 0; i < spec.getResultSetIds().size(); i++) {
+            results = readResults(spec, jobId, spec.getResultSetIds().get(i));
+            for(String str : results) {
+                output.write(str);
+            }
+        }
+        output.close();
+
+        hcc.waitForCompletion(jobId);
+    }
+
     protected File createTempFile() throws IOException {
         File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
         if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 970f2fe..602e193 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -16,7 +16,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -33,6 +32,7 @@
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -44,6 +44,7 @@
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.resources.memory.FrameManager;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
@@ -66,7 +67,8 @@
     public TemporaryFolder outputFolder = new TemporaryFolder();
 
     public AbstractMultiNCIntegrationTest() {
-        outputFiles = new ArrayList<File>();;
+        outputFiles = new ArrayList<File>();
+        ;
     }
 
     @BeforeClass
@@ -124,10 +126,10 @@
 
         int nReaders = 1;
 
-        ByteBuffer resultBuffer = ByteBuffer.allocate(spec.getFrameSize());
-        resultBuffer.clear();
+        FrameManager resultDisplayFrameMgr = new FrameManager(spec.getFrameSize());
+        VSizeFrame resultFrame = new VSizeFrame(resultDisplayFrameMgr);
 
-        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(spec.getFrameSize());
+        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
 
         IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders);
         IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0));
@@ -135,16 +137,16 @@
         JSONArray resultRecords = new JSONArray();
         ByteBufferInputStream bbis = new ByteBufferInputStream();
 
-        int readSize = reader.read(resultBuffer);
+        int readSize = reader.read(resultFrame);
 
         while (readSize > 0) {
 
             try {
-                frameTupleAccessor.reset(resultBuffer);
+                frameTupleAccessor.reset(resultFrame.getBuffer());
                 for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
                     int start = frameTupleAccessor.getTupleStartOffset(tIndex);
                     int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start;
-                    bbis.setByteBuffer(resultBuffer, start);
+                    bbis.setByteBuffer(resultFrame.getBuffer(), start);
                     byte[] recordBytes = new byte[length];
                     bbis.read(recordBytes, 0, length);
                     resultRecords.put(new String(recordBytes, 0, length));
@@ -157,8 +159,7 @@
                 }
             }
 
-            resultBuffer.clear();
-            readSize = reader.read(resultBuffer);
+            readSize = reader.read(resultFrame);
         }
 
         hcc.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index a2ef99a..9b77ec5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -42,9 +42,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
 import edu.uci.ics.hyracks.tests.util.ResultSerializerFactoryProvider;
 
 public class OptimizedSortMergeTest extends AbstractIntegrationTest {
@@ -72,17 +73,22 @@
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
-        OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4,
-                new int[] { 1, 0 }, new IBinaryComparatorFactory[] {
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        int outputLimit = 5; // larger than the total record numbers.
+        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
+                outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
-                ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+        File file = File.createTempFile(getClass().getName(), ".tmp");
+        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+                new FileSplit[] { new FileSplit(NC1_ID, file.getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -98,6 +104,7 @@
                         new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
+        System.out.println("Result write into :" + file.getAbsolutePath());
     }
 
     @Test
@@ -123,11 +130,11 @@
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
-        int outputLimit = 200;
-        OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4,
-                outputLimit, new int[] { 1, 0 }, new IBinaryComparatorFactory[] {
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        int outputLimit = 20;
+        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
+                outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/VSizeFrameSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/VSizeFrameSortMergeTest.java
new file mode 100644
index 0000000..212a99c
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/VSizeFrameSortMergeTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+
+public class VSizeFrameSortMergeTest extends AbstractIntegrationTest {
+
+    public static String[] INPUTS = { "data/tpch0.001/orders-part1.tbl", "data/tpch0.001/orders-part2.tbl" };
+
+    FileSplit[] ordersSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new FileReference(new File(INPUTS[0]))),
+            new FileSplit(NC2_ID, new FileReference(new File(INPUTS[1]))) };
+    IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+    RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE });
+
+    @Test
+    public void sortNormalMergeTest() throws Exception {
+        sortTask(1024, 4);
+        sortTask(256, 4);
+    }
+
+    @Test
+    public void sortLargeMergeTest() throws Exception {
+        sortTask(32, 128);
+        sortTask(16, 256);
+        sortTask(16, 10240);
+    }
+
+    public void sortTask(int frameSize, int frameLimit) throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+        //                PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID );
+
+        spec.setFrameSize(frameSize);
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        File file = File.createTempFile(getClass().getName(), ".tmp");
+
+        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+                new FileSplit[] { new FileSplit(NC1_ID, file.getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+
+        spec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
+                        1, 0 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
+                        new IBinaryComparatorFactory[] {
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("Result write into :" + file.getAbsolutePath());
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/AbstractRunGeneratorTest.java
new file mode 100644
index 0000000..c4faa1a
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+import edu.uci.ics.hyracks.test.support.TestUtils;
+
+public abstract class AbstractRunGeneratorTest {
+    static TestUtils testUtils = new TestUtils();
+    static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+    static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
+    static Random GRandom = new Random(System.currentTimeMillis());
+    static int[] SortFields = new int[] { 0, 1 };
+    static IBinaryComparatorFactory[] ComparatorFactories = new IBinaryComparatorFactory[] {
+            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+    static void assertMaxFrameSizesAreAllEqualsTo(List<RunAndMaxFrameSizePair> maxSize, int pageSize) {
+        for (int i = 0; i < maxSize.size(); i++) {
+            assertTrue(maxSize.get(i).maxFrameSize == pageSize);
+        }
+    }
+
+    abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+            throws HyracksDataException;
+
+    protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
+            int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+        IHyracksTaskContext ctx = testUtils.create(pageSize);
+
+        HashMap<Integer, String> keyValuePair = new HashMap<>();
+        List<IFrame> frameList = new ArrayList<>();
+        prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize,
+                specialData, keyValuePair);
+        AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
+        runGenerator.open();
+        for (IFrame frame : frameList) {
+            runGenerator.nextFrame(frame.getBuffer());
+        }
+        runGenerator.close();
+        matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+        return runGenerator.getRuns();
+    }
+
+    static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
+            Map<Integer, String> keyValuePair) throws HyracksDataException {
+        IFrame frame = new VSizeFrame(ctx);
+        FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
+
+        HashMap<Integer, String> copyMap = new HashMap<>(keyValuePair);
+        assertReadSorted(runs, fta, frame, copyMap);
+
+        HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
+        int maxFrameSizes = 0;
+        for (RunAndMaxFrameSizePair run : runs) {
+            maxFrameSizes = Math.max(maxFrameSizes, run.maxFrameSize);
+        }
+        GroupVSizeFrame gframe = new GroupVSizeFrame(ctx, maxFrameSizes);
+        GroupFrameAccessor gfta = new GroupFrameAccessor(ctx.getInitialFrameSize(), RecordDesc);
+        assertReadSorted(runs, gfta, gframe, copyMap2);
+    }
+
+    static int assertFTADataIsSorted(IFrameTupleAccessor fta, Map<Integer, String> keyValuePair, int preKey)
+            throws HyracksDataException {
+
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream di = new DataInputStream(bbis);
+        for (int i = 0; i < fta.getTupleCount(); i++) {
+            bbis.setByteBuffer(fta.getBuffer(),
+                    fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 0) + fta.getFieldSlotsLength());
+            int key = (int) RecordDesc.getFields()[0].deserialize(di);
+            bbis.setByteBuffer(fta.getBuffer(),
+                    fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
+            String value = (String) RecordDesc.getFields()[1].deserialize(di);
+
+            if (!keyValuePair.get(key).equals(value)) {
+                assertTrue(false);
+            }
+            keyValuePair.remove(key);
+            assertTrue(key >= preKey);
+            preKey = key;
+        }
+        return preKey;
+    }
+
+    static void assertReadSorted(List<RunAndMaxFrameSizePair> runs, IFrameTupleAccessor fta, IFrame frame,
+            Map<Integer, String> keyValuePair) throws HyracksDataException {
+
+        assertTrue(runs.size() > 0);
+        for (RunAndMaxFrameSizePair run : runs) {
+            run.run.open();
+            int preKey = Integer.MIN_VALUE;
+            while (run.run.nextFrame(frame)) {
+                fta.reset(frame.getBuffer());
+                preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+            }
+            run.run.close();
+        }
+        assertTrue(keyValuePair.isEmpty());
+    }
+
+    static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
+            int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
+            throws HyracksDataException {
+
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+        FrameTupleAppender appender = new FrameTupleAppender();
+
+        int datasize = 0;
+        if (specialData != null) {
+            for (Map.Entry<Integer, String> entry : specialData.entrySet()) {
+                tb.reset();
+                tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
+                tb.addField(UTF8StringSerializerDeserializer.INSTANCE, entry.getValue());
+
+                VSizeFrame frame = new VSizeFrame(ctx, FrameHelper
+                        .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+                appender.reset(frame, true);
+                assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
+                frameList.add(frame);
+                datasize += frame.getFrameSize();
+            }
+            keyValuePair.putAll(specialData);
+        }
+
+        VSizeFrame frame = new VSizeFrame(ctx, ctx.getInitialFrameSize());
+        appender.reset(frame, true);
+        while (datasize < minDataSize) {
+            tb.reset();
+            int key = GRandom.nextInt(minDataSize + 1);
+            if (!keyValuePair.containsKey(key)) {
+                String value = generateRandomRecord(minRecordSize, maxRecordSize);
+                tb.addField(IntegerSerializerDeserializer.INSTANCE, key);
+                tb.addField(UTF8StringSerializerDeserializer.INSTANCE, value);
+
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    frameList.add(frame);
+                    datasize += frame.getFrameSize();
+                    frame = new VSizeFrame(ctx, FrameHelper
+                            .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(),
+                                    ctx.getInitialFrameSize()));
+                    appender.reset(frame, true);
+                    assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
+                }
+
+                keyValuePair.put(key, value);
+            }
+        }
+        if (appender.getTupleCount() > 0) {
+            frameList.add(frame);
+        }
+
+    }
+
+    static String generateRandomRecord(int minRecordSize, int maxRecordSize)
+            throws HyracksDataException {
+        int size = GRandom.nextInt(maxRecordSize - minRecordSize + 1) + minRecordSize;
+        return generateRandomFixSizedString(size);
+
+    }
+
+    static String generateRandomFixSizedString(int size) {
+        StringBuilder sb = new StringBuilder(size);
+        for (; size >= 0; --size) {
+            char ch = (char) (GRandom.nextInt(26) + 97);
+            sb.append(ch);
+        }
+        return sb.toString();
+    }
+
+    static HashMap<Integer, String> generateBigObject(int pageSize, int times) {
+        HashMap<Integer, String> map = new HashMap<>(1);
+        for (int i = 1; i < times; i++) {
+            map.put(GRandom.nextInt(), generateRandomFixSizedString(pageSize * i));
+        }
+        return map;
+    }
+
+    @Test
+    public void testAllSmallRecords() throws HyracksDataException {
+        int pageSize = 512;
+        int frameLimit = 4;
+        int numRuns = 2;
+        int minRecordSize = pageSize / 8;
+        int maxRecordSize = pageSize / 8;
+        List<RunAndMaxFrameSizePair> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+                maxRecordSize, null);
+        assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+    }
+
+    @Test
+    public void testAllLargeRecords() throws HyracksDataException {
+        int pageSize = 2048;
+        int frameLimit = 4;
+        int numRuns = 2;
+        int minRecordSize = pageSize;
+        int maxRecordSize = (int) (pageSize * 1.8);
+        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+                null);
+        assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
+    }
+
+    @Test
+    public void testMixedLargeRecords() throws HyracksDataException {
+        int pageSize = 128;
+        int frameLimit = 4;
+        int numRuns = 4;
+        int minRecordSize = 20;
+        int maxRecordSize = pageSize / 2;
+        HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
+        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+                specialPair);
+
+        int max = 0;
+        for (RunAndMaxFrameSizePair run : size) {
+            max = Math.max(max, run.maxFrameSize);
+        }
+        assertTrue(max == pageSize * (frameLimit - 1));
+    }
+
+    @Test(expected = HyracksDataException.class)
+    public void testTooBigRecordWillThrowException() throws HyracksDataException {
+        int pageSize = 1024;
+        int frameLimit = 8;
+        int numRuns = 8;
+        HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
+        int minRecordSize = 10;
+        int maxRecordSize = pageSize / 2;
+        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+                specialPair);
+
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/ExternalSortRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
new file mode 100644
index 0000000..4d7558b
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+
+public class ExternalSortRunGeneratorTest extends AbstractRunGeneratorTest {
+
+    @Override
+    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+            throws HyracksDataException {
+        return new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, RecordDesc,
+                Algorithm.MERGE_SORT, frameLimit);
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HeapSortRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HeapSortRunGeneratorTest.java
new file mode 100644
index 0000000..00eca70
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HeapSortRunGeneratorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+
+public class HeapSortRunGeneratorTest extends AbstractRunGeneratorTest {
+    @Override
+    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+            throws HyracksDataException {
+        return new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
+                RecordDesc);
+    }
+
+    @Test
+    public void testTopK(){
+
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HybridSortRunGenerator.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HybridSortRunGenerator.java
new file mode 100644
index 0000000..f7ecd5e
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/HybridSortRunGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+
+public class HybridSortRunGenerator extends AbstractRunGeneratorTest {
+    @Override
+    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+            throws HyracksDataException {
+        return new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
+                RecordDesc);
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/RunMergingFrameReaderTest.java
new file mode 100644
index 0000000..d5355b8
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -0,0 +1,409 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.ComparatorFactories;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.GRandom;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.RecordDesc;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.SortFields;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.generateRandomRecord;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.matchResult;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.prepareData;
+import static edu.uci.ics.hyracks.tests.unit.ExternalSortRunGeneratorTest.testUtils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+
+public class RunMergingFrameReaderTest {
+    static IBinaryComparator[] Comparators = new IBinaryComparator[] {
+            ComparatorFactories[0].createBinaryComparator(),
+            ComparatorFactories[1].createBinaryComparator(),
+    };
+
+    static class TestFrameReader implements IFrameReader {
+
+        private final int pageSize;
+        private final int numFrames;
+        private final int minRecordSize;
+        private final int maxRecordSize;
+        private TreeMap<Integer, String> result = new TreeMap<>();
+        int maxFrameSize;
+
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+        FrameTupleAppender appender = new FrameTupleAppender();
+        private Iterator<Map.Entry<Integer, String>> iterator;
+        private Map.Entry<Integer, String> lastEntry;
+
+        TestFrameReader(int pageSize, int numFrames, int minRecordSize, int maxRecordSize) {
+            this.pageSize = pageSize;
+            this.numFrames = numFrames;
+            this.minRecordSize = minRecordSize;
+            this.maxRecordSize = maxRecordSize;
+            this.maxFrameSize = pageSize;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            result.clear();
+            int maxTupleSize = prepareSortedData(numFrames * pageSize, minRecordSize, maxRecordSize, null, result);
+            maxFrameSize = FrameHelper.calcAlignedFrameSizeToStore(0, maxTupleSize, pageSize);
+            iterator = result.entrySet().iterator();
+        }
+
+        @Override
+        public boolean nextFrame(IFrame frame) throws HyracksDataException {
+            if (lastEntry == null && !iterator.hasNext()) {
+                return false;
+            }
+            if (lastEntry == null) {
+                lastEntry = iterator.next();
+            }
+            appender.reset(frame, true);
+            while (true) {
+                tb.reset();
+                tb.addField(IntegerSerializerDeserializer.INSTANCE, lastEntry.getKey());
+                tb.addField(UTF8StringSerializerDeserializer.INSTANCE, lastEntry.getValue());
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    break;
+                } else {
+                    if (iterator.hasNext()) {
+                        lastEntry = iterator.next();
+                    } else {
+                        lastEntry = null;
+                        break;
+                    }
+
+                }
+            }
+            //            printFrame(frame.getBuffer());
+            return true;
+        }
+
+        private void printFrame(ByteBuffer buffer) {
+            FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
+            fta.reset(buffer);
+            fta.prettyPrint();
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+        }
+    }
+
+    static int prepareSortedData(int minDataSize, int minRecordSize, int maxRecordSize,
+            Map<Integer, String> specialData, Map<Integer, String> result) throws HyracksDataException {
+
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+
+        int datasize = 0;
+        int maxtuple = 0;
+        if (specialData != null) {
+            for (Map.Entry<Integer, String> entry : specialData.entrySet()) {
+                tb.reset();
+                tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
+                tb.addField(UTF8StringSerializerDeserializer.INSTANCE, entry.getValue());
+                int size = tb.getSize() + tb.getFieldEndOffsets().length * 4;
+                datasize += size;
+                if (size > maxtuple) {
+                    maxtuple = size;
+                }
+            }
+            result.putAll(specialData);
+        }
+
+        while (datasize < minDataSize) {
+            String value = generateRandomRecord(minRecordSize, maxRecordSize);
+            tb.reset();
+            int key = GRandom.nextInt(datasize + 1);
+            if (!result.containsKey(key)) {
+                tb.addField(IntegerSerializerDeserializer.INSTANCE, key);
+                tb.addField(UTF8StringSerializerDeserializer.INSTANCE, value);
+                int size = tb.getSize() + tb.getFieldEndOffsets().length * 4;
+                datasize += size;
+                if (size > maxtuple) {
+                    maxtuple = size;
+                }
+                if (datasize < minDataSize) {
+                    result.put(key, value);
+                }
+            }
+        }
+
+        return maxtuple;
+    }
+
+    @Test
+    public void testOnlyOneRunShouldMerge() throws HyracksDataException {
+        int pageSize = 128;
+        int numRuns = 1;
+        int numFramesPerRun = 1;
+        int minRecordSize = pageSize / 10;
+        int maxRecordSize = pageSize / 8;
+
+        IHyracksTaskContext ctx = testUtils.create(pageSize);
+        List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
+        List<TestFrameReader> readerList = new ArrayList<>(numRuns);
+        List<IFrame> frameList = new ArrayList<>(numRuns);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+                minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+
+        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
+                null, RecordDesc);
+        testMergeSucceed(ctx, reader, keyValueMapList);
+    }
+
+    @Test
+    public void testNormalRunMerge() throws HyracksDataException {
+
+        int pageSize = 128;
+        int numRuns = 2;
+        int numFramesPerRun = 2;
+        int minRecordSize = pageSize / 10;
+        int maxRecordSize = pageSize / 8;
+
+        IHyracksTaskContext ctx = testUtils.create(pageSize);
+        List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
+        List<TestFrameReader> readerList = new ArrayList<>(numRuns);
+        List<IFrame> frameList = new ArrayList<>(numRuns);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+                minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+
+        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
+                null, RecordDesc);
+        testMergeSucceed(ctx, reader, keyValueMapList);
+    }
+
+    @Test
+    public void testNormalRunMergeWithTopK() throws HyracksDataException {
+
+        int pageSize = 128;
+        int numRuns = 2;
+        int numFramesPerRun = 2;
+        int minRecordSize = pageSize / 10;
+        int maxRecordSize = pageSize / 8;
+
+        for (int topK = 1; topK < pageSize * numRuns * numFramesPerRun / maxRecordSize / 2; topK++) {
+            IHyracksTaskContext ctx = testUtils.create(pageSize);
+            List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
+            List<TestFrameReader> readerList = new ArrayList<>(numRuns);
+            List<IFrame> frameList = new ArrayList<>(numRuns);
+            prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+                    minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+
+            RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
+                    Comparators,
+                    null, RecordDesc, topK);
+            int totoalCount = testMergeSucceedInner(ctx, reader, keyValueMapList);
+            int newCount = 0;
+            for (Map<Integer, String> x : keyValueMapList) {
+                newCount += x.size();
+            }
+            assertEquals(topK + newCount, totoalCount);
+        }
+    }
+
+    private void testMergeSucceed(IHyracksTaskContext ctx, RunMergingFrameReader reader,
+            List<Map<Integer, String>> keyValueMapList) throws HyracksDataException {
+
+        testMergeSucceedInner(ctx, reader, keyValueMapList);
+        assertAllKeyValueIsConsumed(keyValueMapList);
+        reader.close();
+    }
+
+    private int testMergeSucceedInner(IHyracksTaskContext ctx, RunMergingFrameReader reader,
+            List<Map<Integer, String>> keyValueMapList) throws HyracksDataException {
+
+        IFrame frame = new VSizeFrame(ctx);
+        reader.open();
+        int count = 0;
+        for (int i = 0; i < keyValueMapList.size(); i++) {
+            keyValueMapList.set(i, new TreeMap<>(keyValueMapList.get(i)));
+            count += keyValueMapList.get(i).size();
+        }
+        while (reader.nextFrame(frame)) {
+            assertFrameIsSorted(frame, keyValueMapList);
+        }
+        return count;
+    }
+
+    @Test
+    public void testOneLargeRunMerge() throws HyracksDataException {
+        int pageSize = 64;
+        int numRuns = 2;
+        int numFramesPerRun = 1;
+        int minRecordSize = pageSize / 10;
+        int maxRecordSize = pageSize / 8;
+
+        IHyracksTaskContext ctx = testUtils.create(pageSize);
+        List<Map<Integer, String>> keyValueMap = new ArrayList<>();
+        List<TestFrameReader> readerList = new ArrayList<>();
+        List<IFrame> frameList = new ArrayList<>();
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+                minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+
+        minRecordSize = pageSize;
+        maxRecordSize = pageSize;
+        numFramesPerRun = 4;
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+                minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+
+        minRecordSize = pageSize * 2;
+        maxRecordSize = pageSize * 2;
+        numFramesPerRun = 6;
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
+                minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+
+        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
+                Comparators,
+                null,
+                RecordDesc);
+        testMergeSucceed(ctx, reader, keyValueMap);
+    }
+
+    @Test
+    public void testRunFileReader() throws HyracksDataException {
+        int pageSize = 128;
+        int numRuns = 4;
+        int numFramesPerRun = 4;
+        int minRecordSize = pageSize / 10;
+        int maxRecordSize = pageSize / 2;
+
+        IHyracksTaskContext ctx = testUtils.create(pageSize);
+        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields,
+                null, ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT,
+                numFramesPerRun);
+
+        runGenerator.open();
+        Map<Integer, String> keyValuePair = new HashMap<>();
+        List<IFrame> frameList = new ArrayList<>();
+        prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
+                null, keyValuePair);
+        for (IFrame frame : frameList) {
+            runGenerator.nextFrame(frame.getBuffer());
+        }
+
+        numFramesPerRun = 2;
+        minRecordSize = pageSize;
+        maxRecordSize = pageSize;
+        frameList.clear();
+        prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
+                null, keyValuePair);
+        for (IFrame frame : frameList) {
+            runGenerator.nextFrame(frame.getBuffer());
+        }
+
+        runGenerator.close();
+        List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
+        for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
+            inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
+        }
+        matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+        List<IFrameReader> runs = new ArrayList<>();
+        for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
+            runs.add(run.run);
+        }
+        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
+                RecordDesc);
+
+        IFrame outFrame = new VSizeFrame(ctx);
+        reader.open();
+        while (reader.nextFrame(outFrame)) {
+            assertFrameIsSorted(outFrame, Arrays.asList(keyValuePair));
+        }
+        reader.close();
+        assertAllKeyValueIsConsumed(Arrays.asList(keyValuePair));
+    }
+
+    private void assertAllKeyValueIsConsumed(List<Map<Integer, String>> keyValueMapList) {
+        for (Map<Integer, String> map : keyValueMapList) {
+            assertTrue(map.isEmpty());
+        }
+    }
+
+    private void assertFrameIsSorted(IFrame frame, List<Map<Integer, String>> keyValueMapList)
+            throws HyracksDataException {
+        FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
+
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream di = new DataInputStream(bbis);
+
+        fta.reset(frame.getBuffer());
+        //        fta.prettyPrint();
+        int preKey = Integer.MIN_VALUE;
+        for (int i = 0; i < fta.getTupleCount(); i++) {
+            bbis.setByteBuffer(fta.getBuffer(),
+                    fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 0) + fta.getFieldSlotsLength());
+            int key = (int) RecordDesc.getFields()[0].deserialize(di);
+            bbis.setByteBuffer(fta.getBuffer(),
+                    fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
+            String value = (String) RecordDesc.getFields()[1].deserialize(di);
+
+            boolean found = false;
+            for (Map<Integer, String> map : keyValueMapList) {
+                if (map.containsKey(key) && map.get(key).equals(value)) {
+                    found = true;
+                    map.remove(key);
+                    break;
+                }
+            }
+            assertTrue(found);
+            assertTrue(preKey <= key);
+            preKey = key;
+        }
+    }
+
+    static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns,
+            int numFramesPerRun, int minRecordSize, int maxRecordSize,
+            List<TestFrameReader> readerList, List<IFrame> frameList, List<Map<Integer, String>> keyValueMap)
+            throws HyracksDataException {
+        for (int i = 0; i < numRuns; i++) {
+            readerList.add(new TestFrameReader(pageSize, numFramesPerRun, minRecordSize, maxRecordSize));
+            frameList.add(new VSizeFrame(ctx, readerList.get(readerList.size() - 1).maxFrameSize));
+            keyValueMap.add(readerList.get(readerList.size() - 1).result);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/TopKRunGeneratorTest.java
new file mode 100644
index 0000000..ae0397b
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.unit;
+
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.SerDers;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
+import static edu.uci.ics.hyracks.tests.unit.AbstractRunGeneratorTest.assertFTADataIsSorted;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.FixedSizeFrame;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+
+public class TopKRunGeneratorTest {
+
+    static final int PAGE_SIZE = 512;
+    static final int NUM_PAGES = 80;
+    static final int SORT_FRAME_LIMIT = 4;
+
+    enum ORDER {
+        INORDER,
+        REVERSE
+    }
+
+    public class InMemorySortDataValidator implements IFrameWriter {
+
+        InMemorySortDataValidator(Map<Integer, String> answer) {
+            this.answer = answer;
+        }
+
+        Map<Integer, String> answer;
+        FrameTupleAccessor accessor;
+        int preKey = Integer.MIN_VALUE;
+
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(RecordDesc);
+            preKey = Integer.MIN_VALUE;
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            preKey = assertFTADataIsSorted(accessor, answer, preKey);
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            assertTrue(answer.isEmpty());
+        }
+    }
+
+    @Test
+    public void testReverseOrderedDataShouldNotGenerateAnyRuns() throws HyracksDataException {
+        int topK = 1;
+        IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+                SortFields, null, ComparatorFactories, RecordDesc);
+
+        testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+    }
+
+    @Test
+    public void testAlreadySortedDataShouldNotGenerateAnyRuns() throws HyracksDataException {
+        int topK = SORT_FRAME_LIMIT;
+        IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+                SortFields, null, ComparatorFactories, RecordDesc);
+
+        testInMemoryOnly(ctx, topK, ORDER.INORDER, sorter);
+    }
+
+    @Test
+    public void testHybridTopKShouldNotGenerateAnyRuns() throws HyracksDataException {
+        int topK = 1;
+        IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+                SortFields, null, ComparatorFactories, RecordDesc);
+
+        testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+    }
+
+    @Test
+    public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
+        int topK = 1;
+        IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
+                SortFields, null, ComparatorFactories, RecordDesc);
+
+    }
+
+    private void testInMemoryOnly(IHyracksTaskContext ctx, int topK, ORDER order, AbstractSortRunGenerator sorter)
+            throws HyracksDataException {
+        Map<Integer, String> keyValuePair = null;
+        switch (order) {
+            case INORDER:
+                keyValuePair = new TreeMap<>();
+                break;
+            case REVERSE:
+                keyValuePair = new TreeMap<>(Collections.reverseOrder());
+                break;
+        }
+
+        List<IFrame> frameList = new ArrayList<>();
+        int minDataSize = PAGE_SIZE * NUM_PAGES * 4 / 5;
+        int minRecordSize = 16;
+        int maxRecordSize = 64;
+
+        AbstractRunGeneratorTest
+                .prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null, keyValuePair);
+
+        assert topK > 0;
+
+        ByteBuffer buffer = prepareSortedData(keyValuePair);
+
+        Map<Integer, String> topKAnswer = getTopKAnswer(keyValuePair, topK);
+
+        doSort(sorter, buffer);
+
+        assertEquals(0, sorter.getRuns().size());
+        validateResult(sorter, topKAnswer);
+    }
+
+    private void validateResult(AbstractSortRunGenerator sorter, Map<Integer, String> topKAnswer)
+            throws HyracksDataException {
+
+        InMemorySortDataValidator validator = new InMemorySortDataValidator(topKAnswer);
+        validator.open();
+        sorter.getSorter().flush(validator);
+        validator.close();
+    }
+
+    private void doSort(AbstractSortRunGenerator sorter, ByteBuffer buffer) throws HyracksDataException {
+
+        sorter.open();
+        sorter.nextFrame(buffer);
+        sorter.close();
+    }
+
+    private Map<Integer, String> getTopKAnswer(Map<Integer, String> keyValuePair, int topK) {
+
+        TreeMap<Integer, String> copy = new TreeMap<>(keyValuePair);
+
+        Map<Integer, String> answer = new TreeMap<>();
+        for (Map.Entry<Integer, String> entry : copy.entrySet()) {
+            if (answer.size() < topK) {
+                answer.put(entry.getKey(), entry.getValue());
+            } else {
+                break;
+            }
+        }
+        return answer;
+    }
+
+    private ByteBuffer prepareSortedData(Map<Integer, String> keyValuePair) throws HyracksDataException {
+        ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE * NUM_PAGES);
+        IFrame inputFrame = new FixedSizeFrame(buffer);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(inputFrame, true);
+        ArrayTupleBuilder builder = new ArrayTupleBuilder(RecordDesc.getFieldCount());
+
+        for (Map.Entry<Integer, String> entry : keyValuePair.entrySet()) {
+            builder.reset();
+            builder.addField(SerDers[0], entry.getKey());
+            builder.addField(SerDers[1], entry.getValue());
+            appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize());
+        }
+        return buffer;
+    }
+}