diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 925ff93..bb8223d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -39,18 +39,26 @@
     private static final long serialVersionUID = 1L;
 
     private final int[] sortFields;
-    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
 
     public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+        this(sortFields,
+                firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, projectionList);
+    }
+
+    public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
         super(projectionList);
         // Obs: the projection list is currently ignored.
         if (projectionList != null) {
             throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
         }
         this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.keyNormalizerFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
     }
 
@@ -67,8 +75,8 @@
                     IFrameBufferManager manager = new VariableFrameMemoryManager(
                             new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
                             FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
-                    frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
-                            comparatorFactories, outputRecordDesc);
+                    frameSorter = new FrameSorterMergeSort(ctx, manager, VariableFramePool.UNLIMITED_MEMORY, sortFields,
+                            keyNormalizerFactories, comparatorFactories, outputRecordDesc);
                 }
                 frameSorter.reset();
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 34ed142..cc4c1b9 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -64,6 +64,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileReference;
@@ -722,7 +723,8 @@
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
 
         // the algebricks op.
-        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, null,
+        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 },
+                (INormalizedKeyComputerFactory) null,
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 null);
         RecordDescriptor sortDesc = scannerDesc;
@@ -836,7 +838,8 @@
 
         // the sort (by nation id)
         RecordDescriptor sortDesc = scannerDesc;
-        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, null,
+        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 },
+                (INormalizedKeyComputerFactory) null,
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, null);
 
         // the group-by
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
index 2c79a4d..7bf8255 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -20,4 +20,9 @@
 
 public interface INormalizedKeyComputer {
     public int normalize(byte[] bytes, int start, int length);
+
+    default void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+        int key = normalize(bytes, start, length);
+        normalizedKeys[keyStart] = key;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
index 2b7198b..901702e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -22,4 +22,21 @@
 
 public interface INormalizedKeyComputerFactory extends Serializable {
     public INormalizedKeyComputer createNormalizedKeyComputer();
+
+    /**
+     *
+     * @return The length of the normalized key in terms of integers
+     */
+    default int getNormalizedKeyLength() {
+        return 1;
+    }
+
+    /**
+     *
+     * @return Whether we can solely rely on this normalized key to complete comparison,
+     *         even when two normalized keys are equal
+     */
+    default boolean isDecisive() {
+        return false;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 5cfef28..41c0740 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -21,7 +21,6 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
     private static final long serialVersionUID = 1L;
@@ -32,8 +31,13 @@
             @Override
             public int normalize(byte[] bytes, int start, int length) {
                 int value = IntegerPointable.getInteger(bytes, start);
-                return value ^Integer.MIN_VALUE;
+                return value ^ Integer.MIN_VALUE;
             }
         };
     }
+
+    @Override
+    public boolean isDecisive() {
+        return true;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index 4ed11e6..5a59b5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -170,8 +170,8 @@
     @Override
     public ITuplePointerAccessor createTuplePointerAccessor() {
         return new AbstractTuplePointerAccessor() {
-            private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
-                    recordDescriptor);
+            private final IAppendDeletableFrameTupleAccessor bufferAccessor =
+                    new DeletableFrameTupleAppender(recordDescriptor);
 
             @Override
             IFrameTupleAccessor getInnerAccessor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index 3f10f50..0b915dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -50,16 +50,25 @@
             int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
-        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields,
+                firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT);
+    }
+
+    public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc,
+            int framesLimit, int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
+        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, keyNormalizerFactories, comparatorFactories,
                 aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT);
     }
 
     public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc,
-            int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            int framesLimit, int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDesc, Algorithm alg, EnumFreeSlotPolicy policy) throws HyracksDataException {
-        super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, inputRecordDesc, alg, policy,
-                framesLimit);
+        super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, inputRecordDesc, alg, policy, framesLimit);
 
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
@@ -70,20 +79,20 @@
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortGroupByRunGenerator.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(ExternalSortGroupByRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 
     @Override
     protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
         //create group-by comparators
-        IBinaryComparator[] comparators = new IBinaryComparator[Math
-                .min(groupFields.length, comparatorFactories.length)];
+        IBinaryComparator[] comparators =
+                new IBinaryComparator[Math.min(groupFields.length, comparatorFactories.length)];
         for (int i = 0; i < comparators.length; i++) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
-                this.inRecordDesc, this.outRecordDesc, writer, true);
+        return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, this.inRecordDesc,
+                this.outRecordDesc, writer, true);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index da5b4a8..23e47f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -55,39 +55,74 @@
     private final RecordDescriptor partialAggRecordDesc;
     private final RecordDescriptor outputRecordDesc;
     private final boolean finalStage;
-    private Algorithm alg = Algorithm.MERGE_SORT;
+    private static final Algorithm ALG = Algorithm.MERGE_SORT;
 
     /**
      * @param spec
-     *            , the Hyracks job specification
+     *            the Hyracks job specification
      * @param framesLimit
-     *            , the frame limit for this operator
+     *            the frame limit for this operator
      * @param sortFields
-     *            , the fields to sort
+     *            the fields to sort
      * @param groupFields
-     *            , the fields to group, which can be a prefix subset of sortFields
+     *            the fields to group, which can be a prefix subset of sortFields
      * @param firstKeyNormalizerFactory
-     *            , the normalized key computer factory of the first key
+     *            the normalized key computer factory of the first key
      * @param comparatorFactories
-     *            , the comparator factories of sort keys
+     *            the comparator factories of sort keys
      * @param partialAggregatorFactory
-     *            , for aggregating the input of this operator
+     *            for aggregating the input of this operator
      * @param mergeAggregatorFactory
-     *            , for aggregating the intermediate data of this operator
+     *            for aggregating the intermediate data of this operator
      * @param partialAggRecordDesc
-     *            , the record descriptor of intermediate data
+     *            the record descriptor of intermediate data
      * @param outRecordDesc
-     *            , the record descriptor of output data
+     *            the record descriptor of output data
      * @param finalStage
-     *            , whether the operator is used for final stage aggregation
+     *            whether the operator is used for final stage aggregation
      */
     public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
             IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
             RecordDescriptor outRecordDesc, boolean finalStage) {
+        this(spec, framesLimit, sortFields, groupFields,
+                firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, partialAggregatorFactory, mergeAggregatorFactory, partialAggRecordDesc,
+                outRecordDesc, finalStage);
+    }
 
-        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, outRecordDesc);
+    /**
+     * @param spec
+     *            the Hyracks job specification
+     * @param framesLimit
+     *            the frame limit for this operator
+     * @param sortFields
+     *            the fields to sort
+     * @param groupFields
+     *            the fields to group, which can be a prefix subset of sortFields
+     * @param keyNormalizerFactories
+     *            the normalized key computer factories for the prefix the sortFields
+     * @param comparatorFactories
+     *            the comparator factories of sort keys
+     * @param partialAggregatorFactory
+     *            for aggregating the input of this operator
+     * @param mergeAggregatorFactory
+     *            for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc
+     *            the record descriptor of intermediate data
+     * @param outRecordDesc
+     *            the record descriptor of output data
+     * @param finalStage
+     *            whether the operator is used for final stage aggregation
+     */
+    public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
+            IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, boolean finalStage) {
+        super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, outRecordDesc);
         if (framesLimit <= 1) {
             throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
         }
@@ -110,8 +145,8 @@
                     IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
                 return new ExternalSortGroupByRunGenerator(ctx, sortFields,
                         recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
-                        groupFields, firstKeyNormalizerFactory, comparatorFactories, partialAggregatorFactory,
-                        partialAggRecordDesc, alg);
+                        groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
+                        partialAggRecordDesc, ALG);
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
index 7c7bfec..a8cc93b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
@@ -39,24 +39,24 @@
     protected final int maxSortFrames;
 
     public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
-                EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT,
+                framesLimit);
     }
 
     public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
-                    throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+            throws HyracksDataException {
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit,
                 Integer.MAX_VALUE);
     }
 
     public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         super();
         this.ctx = ctx;
         maxSortFrames = framesLimit - 1;
@@ -65,11 +65,11 @@
         IFrameBufferManager bufferManager = new VariableFrameMemoryManager(
                 new VariableFramePool(ctx, maxSortFrames * ctx.getInitialFrameSize()), freeSlotPolicy);
         if (alg == Algorithm.MERGE_SORT) {
-            frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
-                    comparatorFactories, recordDesc, outputLimit);
+            frameSorter = new FrameSorterMergeSort(ctx, bufferManager, maxSortFrames, sortFields,
+                    keyNormalizerFactories, comparatorFactories, recordDesc, outputLimit);
         } else {
-            frameSorter = new FrameSorterQuickSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
-                    comparatorFactories, recordDesc, outputLimit);
+            frameSorter = new FrameSorterQuickSort(ctx, bufferManager, maxSortFrames, sortFields,
+                    keyNormalizerFactories, comparatorFactories, recordDesc, outputLimit);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 77d5d49..6c061ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -39,46 +39,90 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 public abstract class AbstractFrameSorter implements IFrameSorter {
 
     protected Logger LOGGER = Logger.getLogger(AbstractFrameSorter.class.getName());
-    static final int PTR_SIZE = 4;
-    static final int ID_FRAMEID = 0;
-    static final int ID_TUPLE_START = 1;
-    static final int ID_TUPLE_END = 2;
-    static final int ID_NORMAL_KEY = 3;
+    protected static final int ID_FRAME_ID = 0;
+    protected static final int ID_TUPLE_START = 1;
+    protected static final int ID_TUPLE_END = 2;
+    protected static final int ID_NORMALIZED_KEY = 3;
+
+    // the length of each normalized key (in terms of integers)
+    protected final int[] normalizedKeyLength;
+    // the total length of the normalized key (in term of integers)
+    protected final int normalizedKeyTotalLength;
+    // whether the normalized keys can be used to decide orders, even when normalized keys are the same
+    protected final boolean normalizedKeysDecisive;
+
+    protected final int ptrSize;
 
     protected final int[] sortFields;
     protected final IBinaryComparator[] comparators;
-    protected final INormalizedKeyComputer nkc;
+    protected final INormalizedKeyComputer[] nkcs;
     protected final IFrameBufferManager bufferManager;
     protected final FrameTupleAccessor inputTupleAccessor;
     protected final IFrameTupleAppender outputAppender;
     protected final IFrame outputFrame;
     protected final int outputLimit;
 
+    protected final long maxSortMemory;
+    protected long totalMemoryUsed;
     protected int[] tPointers;
+    protected final int[] tmpPointer;
     protected int tupleCount;
 
-    private FrameTupleAccessor fta2;
-    private BufferInfo info = new BufferInfo(null, -1, -1);
+    private final FrameTupleAccessor fta2;
+    private final BufferInfo info = new BufferInfo(null, -1, -1);
 
-    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
-                Integer.MAX_VALUE);
+    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+            throws HyracksDataException {
+        this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, Integer.MAX_VALUE);
     }
 
-    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int outputLimit)
+    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] normalizedKeyComputerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
             throws HyracksDataException {
         this.bufferManager = bufferManager;
+        if (maxSortFrames == VariableFramePool.UNLIMITED_MEMORY) {
+            this.maxSortMemory = Long.MAX_VALUE;
+        } else {
+            this.maxSortMemory = (long) ctx.getInitialFrameSize() * maxSortFrames;
+        }
         this.sortFields = sortFields;
-        this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+        int runningNormalizedKeyTotalLength = 0;
+
+        if (normalizedKeyComputerFactories != null) {
+            int decisivePrefixLength = getDecisivePrefixLength(normalizedKeyComputerFactories);
+
+            // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
+            // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
+            // computing unncessary normalized keys
+            int normalizedKeys = decisivePrefixLength < normalizedKeyComputerFactories.length ? decisivePrefixLength + 1
+                    : decisivePrefixLength;
+            this.nkcs = new INormalizedKeyComputer[normalizedKeys];
+            this.normalizedKeyLength = new int[normalizedKeys];
+
+            for (int i = 0; i < normalizedKeys; i++) {
+                this.nkcs[i] = normalizedKeyComputerFactories[i].createNormalizedKeyComputer();
+                this.normalizedKeyLength[i] = normalizedKeyComputerFactories[i].getNormalizedKeyLength();
+                runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
+            }
+            this.normalizedKeysDecisive = decisivePrefixLength == comparatorFactories.length;
+        } else {
+            this.nkcs = null;
+            this.normalizedKeyLength = null;
+            this.normalizedKeysDecisive = false;
+        }
+        this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength;
+        this.ptrSize = ID_NORMALIZED_KEY + normalizedKeyTotalLength;
         this.comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -88,17 +132,24 @@
         this.outputFrame = new VSizeFrame(ctx);
         this.outputLimit = outputLimit;
         this.fta2 = new FrameTupleAccessor(recordDescriptor);
+        this.tmpPointer = new int[ptrSize];
     }
 
     @Override
     public void reset() throws HyracksDataException {
         this.tupleCount = 0;
+        this.totalMemoryUsed = 0;
         this.bufferManager.reset();
     }
 
     @Override
     public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException {
-        if (bufferManager.insertFrame(inputBuffer) >= 0) {
+        inputTupleAccessor.reset(inputBuffer);
+        long requiredMemory = getRequiredMemory(inputTupleAccessor);
+        if (totalMemoryUsed + requiredMemory <= maxSortMemory && bufferManager.insertFrame(inputBuffer) >= 0) {
+            // we have enough memory
+            totalMemoryUsed += requiredMemory;
+            tupleCount += inputTupleAccessor.getTupleCount();
             return true;
         }
         if (getFrameCount() == 0) {
@@ -108,36 +159,41 @@
         return false;
     }
 
+    protected long getRequiredMemory(FrameTupleAccessor frameAccessor) {
+        return (long) frameAccessor.getBuffer().capacity() + ptrSize * frameAccessor.getTupleCount() * Integer.BYTES;
+    }
+
     @Override
     public void sort() throws HyracksDataException {
-        tupleCount = 0;
-        for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
-            bufferManager.getFrame(i, info);
-            inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
-            tupleCount += inputTupleAccessor.getTupleCount();
-        }
-        if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) {
-            tPointers = new int[tupleCount * PTR_SIZE];
+        if (tPointers == null || tPointers.length < tupleCount * ptrSize) {
+            tPointers = new int[tupleCount * ptrSize];
         }
         int ptr = 0;
-        int sfIdx = sortFields[0];
         for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
             bufferManager.getFrame(i, info);
             inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
             int tCount = inputTupleAccessor.getTupleCount();
             byte[] array = inputTupleAccessor.getBuffer().array();
-            for (int j = 0; j < tCount; ++j) {
+            int fieldSlotsLength = inputTupleAccessor.getFieldSlotsLength();
+            for (int j = 0; j < tCount; ++j, ++ptr) {
                 int tStart = inputTupleAccessor.getTupleStartOffset(j);
                 int tEnd = inputTupleAccessor.getTupleEndOffset(j);
-                tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i;
-                tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart;
-                tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd;
-                int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, sfIdx);
-                int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + inputTupleAccessor.getFieldSlotsLength();
-                tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] =
-                        nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
-                ++ptr;
+                tPointers[ptr * ptrSize + ID_FRAME_ID] = i;
+                tPointers[ptr * ptrSize + ID_TUPLE_START] = tStart;
+                tPointers[ptr * ptrSize + ID_TUPLE_END] = tEnd;
+                if (nkcs == null) {
+                    continue;
+                }
+                int keyPos = ptr * ptrSize + ID_NORMALIZED_KEY;
+                for (int k = 0; k < nkcs.length; k++) {
+                    int sortField = sortFields[k];
+                    int fieldStartOffsetRel = inputTupleAccessor.getFieldStartOffset(j, sortField);
+                    int fieldEndOffsetRel = inputTupleAccessor.getFieldEndOffset(j, sortField);
+                    int fieldStartOffset = fieldStartOffsetRel + tStart + fieldSlotsLength;
+                    nkcs[k].normalize(array, fieldStartOffset, fieldEndOffsetRel - fieldStartOffsetRel, tPointers,
+                            keyPos);
+                    keyPos += normalizedKeyLength[k];
+                }
             }
         }
         if (tupleCount > 0) {
@@ -164,9 +220,9 @@
         int limit = Math.min(tupleCount, outputLimit);
         int io = 0;
         for (int ptr = 0; ptr < limit; ++ptr) {
-            int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID];
-            int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START];
-            int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END];
+            int i = tPointers[ptr * ptrSize + ID_FRAME_ID];
+            int tStart = tPointers[ptr * ptrSize + ID_TUPLE_START];
+            int tEnd = tPointers[ptr * ptrSize + ID_TUPLE_END];
             bufferManager.getFrame(i, info);
             inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
             int flushed = FrameUtils.appendToWriter(writer, outputAppender, inputTupleAccessor, tStart, tEnd);
@@ -185,19 +241,23 @@
     }
 
     protected final int compare(int tp1, int tp2) throws HyracksDataException {
-        int i1 = tPointers[tp1 * 4 + ID_FRAMEID];
-        int j1 = tPointers[tp1 * 4 + ID_TUPLE_START];
-        int v1 = tPointers[tp1 * 4 + ID_NORMAL_KEY];
+        return compare(tPointers, tp1, tPointers, tp2);
+    }
 
-        int tp2i = tPointers[tp2 * 4 + ID_FRAMEID];
-        int tp2j = tPointers[tp2 * 4 + ID_TUPLE_START];
-        int tp2v = tPointers[tp2 * 4 + ID_NORMAL_KEY];
-
-        if (v1 != tp2v) {
-            return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
+    protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, int tp2) throws HyracksDataException {
+        if (nkcs != null) {
+            int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2,
+                    tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength);
+            if (cmpNormalizedKey != 0 || normalizedKeysDecisive) {
+                return cmpNormalizedKey;
+            }
         }
-        int i2 = tp2i;
-        int j2 = tp2j;
+
+        int i1 = tPointers1[tp1 * ptrSize + ID_FRAME_ID];
+        int j1 = tPointers1[tp1 * ptrSize + ID_TUPLE_START];
+        int i2 = tPointers2[tp2 * ptrSize + ID_FRAME_ID];
+        int j2 = tPointers2[tp2 * ptrSize + ID_TUPLE_START];
+
         bufferManager.getFrame(i1, info);
         byte[] b1 = info.getBuffer().array();
         inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength());
@@ -223,6 +283,43 @@
         return 0;
     }
 
+    public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) {
+        for (int i = 0; i < length; i++) {
+            int key1 = keys1[start1 + i];
+            int key2 = keys2[start2 + i];
+            if (key1 != key2) {
+                return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1;
+            }
+        }
+        return 0;
+    }
+
+    public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) {
+        if (keyNormalizerFactories == null) {
+            return 0;
+        }
+        for (int i = 0; i < keyNormalizerFactories.length; i++) {
+            if (!keyNormalizerFactories[i].isDecisive()) {
+                return i;
+            }
+        }
+        return keyNormalizerFactories.length;
+    }
+
+    protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
+        System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
+        System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
+        System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
+    }
+
+    protected void copy(int src[], int srcPos, int dest[], int destPos) {
+        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize);
+    }
+
+    protected void copy(int src[], int srcPos, int dest[], int destPos, int n) {
+        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize);
+    }
+
     @Override
     public void close() {
         tupleCount = 0;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 1cd5fc3..602157f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -56,17 +56,17 @@
     protected static final int MERGE_ACTIVITY_ID = 1;
 
     protected final int[] sortFields;
-    protected final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    protected final INormalizedKeyComputerFactory[] keyNormalizerFactories;
     protected final IBinaryComparatorFactory[] comparatorFactories;
     protected final int framesLimit;
 
     public AbstractSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.keyNormalizerFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
         outRecDescs[0] = recordDescriptor;
     }
@@ -174,8 +174,8 @@
                     for (int i = 0; i < comparatorFactories.length; ++i) {
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
-                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
-                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+                    INormalizedKeyComputer nmkComputer = keyNormalizerFactories == null ? null
+                            : keyNormalizerFactories[0].createNormalizedKeyComputer();
                     AbstractExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter,
                             runs, comparators, nmkComputer, framesLimit);
                     merger.process();
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 1b66ccf..b58d4c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,21 +43,31 @@
     private final int outputLimit;
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
+        this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, alg,
                 EnumFreeSlotPolicy.LAST_FIT);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
-        this(spec, framesLimit, sortFields, null, comparatorFactories, recordDescriptor);
+        this(spec, framesLimit, sortFields, (INormalizedKeyComputerFactory[]) null, comparatorFactories,
+                recordDescriptor);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+        this(spec, framesLimit, sortFields,
+                firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, recordDescriptor, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
+    }
+
+    public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor,
                 Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
     }
 
@@ -69,7 +79,7 @@
             @Override
             protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
-                return new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                return new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, comparatorFactories,
                         outRecDescs[0], alg, policy, framesLimit, outputLimit);
             }
         };
@@ -92,16 +102,16 @@
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
-                policy, Integer.MAX_VALUE);
+        this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, alg, policy,
+                Integer.MAX_VALUE);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) {
-        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+        super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
         if (framesLimit <= 1) {
             throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index b451b1c..785b94e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -31,32 +31,32 @@
 public class ExternalSortRunGenerator extends AbstractExternalSortRunGenerator {
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
-                EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT,
+                framesLimit);
     }
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
-                    throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+            throws HyracksDataException {
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit,
                 Integer.MAX_VALUE);
     }
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
-                    throws HyracksDataException {
-        super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+            throws HyracksDataException {
+        super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit,
                 outputLimit);
     }
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext()
-                .createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
+        FileReference file =
+                ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index ed28560..260b665 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -23,24 +23,27 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 
 public class FrameSorterMergeSort extends AbstractFrameSorter {
 
     private int[] tPointersTemp;
 
-    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
-                Integer.MAX_VALUE);
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+            throws HyracksDataException {
+        this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, Integer.MAX_VALUE);
     }
 
-    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
-        super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
-                outputLimit);
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
+            throws HyracksDataException {
+        super(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, outputLimit);
     }
 
     @Override
@@ -52,6 +55,11 @@
     }
 
     @Override
+    protected long getRequiredMemory(FrameTupleAccessor frameAccessor) {
+        return super.getRequiredMemory(frameAccessor) + ptrSize * frameAccessor.getTupleCount() * Integer.BYTES;
+    }
+
+    @Override
     public void close() {
         super.close();
         tPointersTemp = null;
@@ -68,7 +76,7 @@
                 if (next < end) {
                     merge(i, next, step, Math.min(step, end - next));
                 } else {
-                    System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
+                    copy(tPointers, i, tPointersTemp, i, end - i);
                 }
             }
             /** prepare next phase merge */
@@ -91,29 +99,21 @@
         while (pos1 <= end1 && pos2 <= end2) {
             int cmp = compare(pos1, pos2);
             if (cmp <= 0) {
-                copy(pos1, targetPos);
+                copy(tPointers, pos1, tPointersTemp, targetPos);
                 pos1++;
             } else {
-                copy(pos2, targetPos);
+                copy(tPointers, pos2, tPointersTemp, targetPos);
                 pos2++;
             }
             targetPos++;
         }
         if (pos1 <= end1) {
             int rest = end1 - pos1 + 1;
-            System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
+            copy(tPointers, pos1, tPointersTemp, targetPos, rest);
         }
         if (pos2 <= end2) {
             int rest = end2 - pos2 + 1;
-            System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
+            copy(tPointers, pos2, tPointersTemp, targetPos, rest);
         }
     }
-
-    private void copy(int src, int dest) {
-        tPointersTemp[dest * 4 + ID_FRAMEID] = tPointers[src * 4 + ID_FRAMEID];
-        tPointersTemp[dest * 4 + ID_TUPLE_START] = tPointers[src * 4 + ID_TUPLE_START];
-        tPointersTemp[dest * 4 + ID_TUPLE_END] = tPointers[src * 4 + ID_TUPLE_END];
-        tPointersTemp[dest * 4 + ID_NORMAL_KEY] = tPointers[src * 4 + ID_NORMAL_KEY];
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index cf864f6..486bc7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -27,18 +27,20 @@
 
 public class FrameSorterQuickSort extends AbstractFrameSorter {
 
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
-                Integer.MAX_VALUE);
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+            throws HyracksDataException {
+        this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, Integer.MAX_VALUE);
     }
 
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
-        super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
-                outputLimit);
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
+            throws HyracksDataException {
+        super(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, outputLimit);
     }
 
     @Override
@@ -60,7 +62,7 @@
                     break;
                 }
                 if (cmp == 0) {
-                    swap(tPointers, a++, b);
+                    swap(tPointers, a++, tPointers, b);
                 }
                 ++b;
             }
@@ -70,13 +72,13 @@
                     break;
                 }
                 if (cmp == 0) {
-                    swap(tPointers, c, d--);
+                    swap(tPointers, c, tPointers, d--);
                 }
                 --c;
             }
             if (b > c)
                 break;
-            swap(tPointers, b++, c--);
+            swap(tPointers, b++, tPointers, c--);
         }
 
         int s;
@@ -94,17 +96,9 @@
         }
     }
 
-    private void swap(int x[], int a, int b) {
-        for (int i = 0; i < 4; ++i) {
-            int t = x[a * 4 + i];
-            x[a * 4 + i] = x[b * 4 + i];
-            x[b * 4 + i] = t;
-        }
-    }
-
     private void vecswap(int x[], int a, int b, int n) {
         for (int i = 0; i < n; i++, a++, b++) {
-            swap(x, a, b);
+            swap(x, a, x, b);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index a058624..1578975 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -31,31 +31,31 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 
 public class HeapSortRunGenerator extends AbstractSortRunGenerator {
     protected final IHyracksTaskContext ctx;
     protected final int frameLimit;
     protected final int topK;
     protected final int[] sortFields;
-    protected final INormalizedKeyComputerFactory nmkFactory;
+    protected final INormalizedKeyComputerFactory[] nmkFactories;
     protected final IBinaryComparatorFactory[] comparatorFactories;
     protected final RecordDescriptor recordDescriptor;
     protected ITupleSorter tupleSorter;
     protected IFrameTupleAccessor inAccessor;
 
     public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super();
         this.ctx = ctx;
         this.frameLimit = frameLimit;
         this.topK = topK;
         this.sortFields = sortFields;
-        this.nmkFactory = firstKeyNormalizerFactory;
+        this.nmkFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
         this.inAccessor = new FrameTupleAccessor(recordDescriptor);
         this.recordDescriptor = recordDescriptor;
@@ -64,8 +64,9 @@
     @Override
     public void open() throws HyracksDataException {
         IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize());
-        IDeletableTupleBufferManager bufferManager = new VariableDeletableTupleMemoryManager(framePool, recordDescriptor);
-        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory, comparatorFactories);
+        IDeletableTupleBufferManager bufferManager =
+                new VariableDeletableTupleMemoryManager(framePool, recordDescriptor);
+        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactories, comparatorFactories);
         super.open();
     }
 
@@ -76,8 +77,8 @@
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext()
-                .createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
+        FileReference file =
+                ctx.getJobletContext().createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
index 4311128..80b36ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -43,9 +43,9 @@
     private int tupleSorterFlushedTimes = 0;
 
     public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        super(ctx, frameLimit, topK, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+        super(ctx, frameLimit, topK, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
     }
 
     @Override
@@ -60,8 +60,8 @@
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext()
-                .createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName());
+        FileReference file =
+                ctx.getJobletContext().createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 
@@ -101,8 +101,8 @@
                         new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize()),
                         FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.BIGGEST_FIT,
                                 frameLimit - 1));
-                frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, nmkFactory, comparatorFactories,
-                        recordDescriptor, topK);
+                frameSorter = new FrameSorterMergeSort(ctx, bufferManager, frameLimit - 1, sortFields, nmkFactories,
+                        comparatorFactories, recordDescriptor, topK);
                 if (LOG.isLoggable(Level.FINE)) {
                     LOG.fine("create frameSorter");
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 996101b..adc0d5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -53,8 +53,8 @@
     private static final int MERGE_ACTIVITY_ID = 1;
 
     private final int[] sortFields;
-    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
 
     public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
@@ -62,11 +62,11 @@
     }
 
     public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.keyNormalizerFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
         outRecDescs[0] = recordDescriptor;
     }
@@ -123,8 +123,9 @@
                             new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
                             FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
 
-                    state.frameSorter = new FrameSorterMergeSort(ctx, frameBufferManager, sortFields,
-                            firstKeyNormalizerFactory, comparatorFactories, outRecDescs[0]);
+                    state.frameSorter =
+                            new FrameSorterMergeSort(ctx, frameBufferManager, VariableFramePool.UNLIMITED_MEMORY,
+                                    sortFields, keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
                     state.frameSorter.reset();
                 }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 988eea3..a90d48f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -41,7 +41,16 @@
     public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+        this(spec, framesLimit, topK, sortFields,
+                firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, recordDescriptor);
+    }
+
+    public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
         this.topK = topK;
     }
 
@@ -53,7 +62,7 @@
             @Override
             protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) {
-                return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, firstKeyNormalizerFactory,
+                return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, keyNormalizerFactories,
                         comparatorFactories, outRecDescs[0]);
 
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index c473819..980857a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -55,18 +55,19 @@
     }
 
     class HeapEntry implements IResetableComparable<HeapEntry> {
-        int nmk;
+        int[] nmk;
         TuplePointer tuplePointer;
 
         public HeapEntry() {
             tuplePointer = new TuplePointer();
-            nmk = 0;
+            nmk = new int[normalizedKeyTotalLength];
         }
 
         @Override
         public int compareTo(HeapEntry o) {
-            if (nmk != o.nmk) {
-                return ((((long) nmk) & 0xffffffffL) < (((long) o.nmk) & 0xffffffffL)) ? -1 : 1;
+            int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
+            if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
+                return cmpNormalizedKey;
             }
             bufferAccessor1.reset(tuplePointer);
             bufferAccessor2.reset(o.tuplePointer);
@@ -93,13 +94,15 @@
             return 0;
         }
 
-        public void reset(int nmkey) {
-            nmk = nmkey;
+        public void reset(int[] nmkey) {
+            if (normalizedKeyTotalLength > 0) {
+                System.arraycopy(nmkey, 0, nmk, 0, normalizedKeyTotalLength);
+            }
         }
 
         @Override
         public void reset(HeapEntry other) {
-            nmk = other.nmk;
+            reset(other.nmk);
             tuplePointer.reset(other.tuplePointer);
         }
     }
@@ -111,19 +114,23 @@
     private final FrameTupleAppender outputAppender;
     private final IFrame outputFrame;
     private final int[] sortFields;
-    private final INormalizedKeyComputer nkc;
+    private final INormalizedKeyComputer[] nkcs;
+    private final boolean normalizedKeyDecisive;
+    private final int[] normalizedKeyLength;
+    private final int normalizedKeyTotalLength;
     private final IBinaryComparator[] comparators;
 
-    private HeapEntry maxEntry;
-    private HeapEntry newEntry;
+    private final HeapEntry maxEntry;
+    private final HeapEntry newEntry;
 
     private MaxHeap heap;
     private boolean isSorted;
 
+    private final int[] nmk;
+
     public TupleSorterHeapSort(IHyracksTaskContext ctx, IDeletableTupleBufferManager bufferManager, int topK,
-            int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories)
-            throws HyracksDataException {
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException {
         this.bufferManager = bufferManager;
         this.bufferAccessor1 = bufferManager.createTuplePointerAccessor();
         this.bufferAccessor2 = bufferManager.createTuplePointerAccessor();
@@ -131,7 +138,31 @@
         this.outputFrame = new VSizeFrame(ctx);
         this.outputAppender = new FrameTupleAppender();
         this.sortFields = sortFields;
-        this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+        int runningNormalizedKeyTotalLength = 0;
+        if (keyNormalizerFactories != null) {
+            int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories);
+
+            // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
+            // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
+            // computing unncessary normalized keys
+            int normalizedKeys = decisivePrefixLength < keyNormalizerFactories.length ? decisivePrefixLength + 1
+                    : decisivePrefixLength;
+            this.nkcs = new INormalizedKeyComputer[normalizedKeys];
+            this.normalizedKeyLength = new int[normalizedKeys];
+
+            for (int i = 0; i < normalizedKeys; i++) {
+                this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer();
+                this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength();
+                runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
+            }
+            this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length;
+        } else {
+            this.nkcs = null;
+            this.normalizedKeyLength = null;
+            this.normalizedKeyDecisive = false;
+        }
+        this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength;
         this.comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -141,6 +172,7 @@
         this.maxEntry = new HeapEntry();
         this.newEntry = new HeapEntry();
         this.isSorted = false;
+        this.nmk = new int[runningNormalizedKeyTotalLength];
     }
 
     @Override
@@ -154,7 +186,7 @@
             throw new HyracksDataException(
                     "The Heap haven't be reset after sorting, the order of using this class is not correct.");
         }
-        int nmkey = getPNK(frameTupleAccessor, index);
+        int[] nmkey = getPNK(frameTupleAccessor, index);
         if (heap.getNumEntries() >= topK) {
             heap.peekMax(maxEntry);
             if (compareTuple(frameTupleAccessor, index, nmkey, maxEntry) >= 0) {
@@ -175,20 +207,29 @@
         return true;
     }
 
-    private int getPNK(IFrameTupleAccessor fta, int tIx) {
-        if (nkc == null) {
-            return 0;
+    private int[] getPNK(IFrameTupleAccessor fta, int tIx) {
+        if (nkcs == null) {
+            return nmk;
         }
-        int sfIdx = sortFields[0];
-        return nkc.normalize(fta.getBuffer().array(), fta.getAbsoluteFieldStartOffset(tIx, sfIdx),
-                fta.getFieldLength(tIx, sfIdx));
+        int keyPos = 0;
+        byte[] buffer = fta.getBuffer().array();
+        for (int i = 0; i < nkcs.length; i++) {
+            int sfIdx = sortFields[i];
+            nkcs[i].normalize(buffer, fta.getAbsoluteFieldStartOffset(tIx, sfIdx), fta.getFieldLength(tIx, sfIdx), nmk,
+                    keyPos);
+            keyPos += normalizedKeyLength[i];
+        }
+        return nmk;
     }
 
-    private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int nmkey, HeapEntry maxEntry)
+    private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry)
             throws HyracksDataException {
-        if (nmkey != maxEntry.nmk) {
-            return ((((long) nmkey) & 0xffffffffL) < (((long) maxEntry.nmk) & 0xffffffffL)) ? -1 : 1;
+        int cmpNormalizedKey =
+                AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
+        if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
+            return cmpNormalizedKey;
         }
+
         bufferAccessor2.reset(maxEntry.tuplePointer);
         byte[] b1 = frameTupleAccessor.getBuffer().array();
         byte[] b2 = bufferAccessor2.getBuffer().array();
@@ -254,9 +295,8 @@
         for (int i = 0; i < numEntries; i++) {
             HeapEntry minEntry = (HeapEntry) entries[i];
             bufferAccessor1.reset(minEntry.tuplePointer);
-            int flushed = FrameUtils
-                    .appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(),
-                            bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
+            int flushed = FrameUtils.appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(),
+                    bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
             if (flushed > 0) {
                 maxFrameSize = Math.max(maxFrameSize, flushed);
                 io++;
@@ -265,8 +305,7 @@
         maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
         outputAppender.write(writer, true);
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(
-                    "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
+            LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
         }
         return maxFrameSize;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 160336a..56bf853 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -58,31 +59,33 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part1.tbl"),
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part2.tbl") };
+                new ManagedFileSplit(NC1_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"),
+                new ManagedFileSplit(NC2_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
+        RecordDescriptor ordersDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
                 new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         int outputLimit = 5; // larger than the total record numbers.
-        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
-                outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+                (INormalizedKeyComputerFactory) null,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
@@ -90,23 +93,21 @@
         spec.addResultSetId(rsId);
 
         FileSplit fs = createFile(nc1);
-        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
-                new FileSplit[] { fs });
+        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { fs });
         IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(
-                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
-                        1, 0 }, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
-                        new IBinaryComparatorFactory[] {
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 1, 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
         System.out.println("Result write into :" + fs.getPath() + " in node: " + fs.getNodeName());
@@ -122,31 +123,33 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part1.tbl"),
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part2.tbl") };
+                new ManagedFileSplit(NC1_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"),
+                new ManagedFileSplit(NC2_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
+        RecordDescriptor ordersDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
                 new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         int outputLimit = 20;
-        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
-                outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+                (INormalizedKeyComputerFactory) null,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index cfd4f30..d3b6b5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInputStream;
 import java.util.ArrayList;
@@ -57,11 +57,11 @@
     static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
             new UTF8StringSerializerDeserializer() };
     static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
-    static Random GRandom = new Random(System.currentTimeMillis());
+    static Random GRandom = new Random(0);
     static int[] SortFields = new int[] { 0, 1 };
-    static IBinaryComparatorFactory[] ComparatorFactories = new IBinaryComparatorFactory[] {
-            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+    static IBinaryComparatorFactory[] ComparatorFactories =
+            new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
 
     static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) {
         for (int i = 0; i < maxSize.size(); i++) {
@@ -69,25 +69,30 @@
         }
     }
 
-    abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
-            throws HyracksDataException;
+    abstract AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit,
+            int numOfInputRecord) throws HyracksDataException;
 
-    protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
-            int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+    protected List<List<GeneratedRunFileReader>> testSortRecords(int pageSize, int frameLimit, int numRuns,
+            int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
         IHyracksTaskContext ctx = testUtils.create(pageSize);
 
         HashMap<Integer, String> keyValuePair = new HashMap<>();
         List<IFrame> frameList = new ArrayList<>();
         prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize, specialData,
                 keyValuePair);
-        AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
-        runGenerator.open();
-        for (IFrame frame : frameList) {
-            runGenerator.nextFrame(frame.getBuffer());
+
+        List<List<GeneratedRunFileReader>> results = new ArrayList<>();
+        AbstractSortRunGenerator[] runGenerators = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
+        for (AbstractSortRunGenerator runGenerator : runGenerators) {
+            runGenerator.open();
+            for (IFrame frame : frameList) {
+                runGenerator.nextFrame(frame.getBuffer());
+            }
+            runGenerator.close();
+            matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+            results.add(runGenerator.getRuns());
         }
-        runGenerator.close();
-        matchResult(ctx, runGenerator.getRuns(), keyValuePair);
-        return runGenerator.getRuns();
+        return results;
     }
 
     static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
@@ -114,7 +119,9 @@
             bbis.setByteBuffer(fta.getBuffer(),
                     fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
             String value = (String) RecordDesc.getFields()[1].deserialize(di);
-
+            if (!keyValuePair.containsKey(key)) {
+                assertTrue(false);
+            }
             if (!keyValuePair.get(key).equals(value)) {
                 assertTrue(false);
             }
@@ -146,7 +153,7 @@
 
     static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
             int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
-                    throws HyracksDataException {
+            throws HyracksDataException {
 
         ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
         FrameTupleAppender appender = new FrameTupleAppender();
@@ -158,8 +165,9 @@
                 tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
                 tb.addField(new UTF8StringSerializerDeserializer(), entry.getValue());
 
-                VSizeFrame frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(
-                        tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+                VSizeFrame frame =
+                        new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length,
+                                tb.getSize(), ctx.getInitialFrameSize()));
                 appender.reset(frame, true);
                 assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
                 frameList.add(frame);
@@ -226,9 +234,25 @@
         int numRuns = 2;
         int minRecordSize = pageSize / 8;
         int maxRecordSize = pageSize / 8;
-        List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
-                maxRecordSize, null);
-        assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+        List<List<GeneratedRunFileReader>> maxSizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+        for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+            assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+        }
+    }
+
+    @Test
+    public void testAllManySmallRecords() throws HyracksDataException {
+        int pageSize = 10240;
+        int frameLimit = 4;
+        int numRuns = 2;
+        int minRecordSize = pageSize / 8;
+        int maxRecordSize = pageSize / 8;
+        List<List<GeneratedRunFileReader>> maxSizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+        for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+            assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+        }
     }
 
     @Test
@@ -238,9 +262,11 @@
         int numRuns = 2;
         int minRecordSize = pageSize;
         int maxRecordSize = (int) (pageSize * 1.8);
-        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                null);
-        assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
+        List<List<GeneratedRunFileReader>> maxSizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+        for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+            assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize * 2);
+        }
     }
 
     @Test
@@ -250,15 +276,16 @@
         int numRuns = 4;
         int minRecordSize = 20;
         int maxRecordSize = pageSize / 2;
-        HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
-        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                specialPair);
-
-        int max = 0;
-        for (GeneratedRunFileReader run : size) {
-            max = Math.max(max, run.getMaxFrameSize());
+        HashMap<Integer, String> specialPair = generateBigObject(pageSize / 2, frameLimit - 1);
+        List<List<GeneratedRunFileReader>> sizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair);
+        for (List<GeneratedRunFileReader> size : sizes) {
+            int max = 0;
+            for (GeneratedRunFileReader run : size) {
+                max = Math.max(max, run.getMaxFrameSize());
+            }
+            assertTrue(max <= pageSize * (frameLimit - 1) && max >= pageSize * 2);
         }
-        assertTrue(max == pageSize * (frameLimit - 1));
     }
 
     @Test(expected = HyracksDataException.class)
@@ -269,8 +296,6 @@
         HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
         int minRecordSize = 10;
         int maxRecordSize = pageSize / 2;
-        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                specialPair);
-
+        testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
index fd57f1e..6765d1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
@@ -20,7 +20,11 @@
 package org.apache.hyracks.tests.unit;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
@@ -28,9 +32,20 @@
 public class ExternalSortRunGeneratorTest extends AbstractRunGeneratorTest {
 
     @Override
-    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+    AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
             throws HyracksDataException {
-        return new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, RecordDesc,
-                Algorithm.MERGE_SORT, frameLimit);
+        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
+                RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE);
+        ExternalSortRunGenerator runGeneratorWithOneNormalizeKey = new ExternalSortRunGenerator(ctx, SortFields,
+                new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories,
+                RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE);
+        ExternalSortRunGenerator runGeneratorWithNormalizeKeys = new ExternalSortRunGenerator(ctx, SortFields,
+                new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory(),
+                        new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit,
+                Integer.MAX_VALUE);
+
+        return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizeKey,
+                runGeneratorWithNormalizeKeys };
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
index d219a56..5d9e771 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
@@ -19,23 +19,37 @@
 
 package org.apache.hyracks.tests.unit;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+import org.junit.Test;
 
 public class HeapSortRunGeneratorTest extends AbstractRunGeneratorTest {
     @Override
-    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+    AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
             throws HyracksDataException {
-        return new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
-                RecordDesc);
+        HeapSortRunGenerator runGenerator = new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+                null, ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator runGeneratorWithOneNormalizedKey =
+                new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+                        new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() },
+                        ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator runGeneratorWithNormalizedKeys = new HeapSortRunGenerator(ctx, frameLimit,
+                numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] {
+                        new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc);
+
+        return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey,
+                runGeneratorWithNormalizedKeys };
+
     }
 
     @Test
-    public void testTopK(){
+    public void testTopK() {
 
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
deleted file mode 100644
index d1080f8..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.tests.unit;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
-
-public class HybridSortRunGenerator extends AbstractRunGeneratorTest {
-    @Override
-    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
-            throws HyracksDataException {
-        return new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
-                RecordDesc);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
new file mode 100644
index 0000000..d91f1e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+
+public class HybridSortRunGeneratorTest extends AbstractRunGeneratorTest {
+    @Override
+    AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+            throws HyracksDataException {
+        HybridTopKSortRunGenerator runGenerator = new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord,
+                SortFields, null, ComparatorFactories, RecordDesc);
+        HybridTopKSortRunGenerator runGeneratorWithOneNormalizedKey =
+                new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+                        new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() },
+                        ComparatorFactories, RecordDesc);
+        HybridTopKSortRunGenerator runGeneratorWithNormalizedKeys = new HybridTopKSortRunGenerator(ctx, frameLimit,
+                numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] {
+                        new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc);
+
+        return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey,
+                runGeneratorWithNormalizedKeys };
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index c68d59d..a219518 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -19,16 +19,8 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*;
+import static org.junit.Assert.*;
 
 import java.io.DataInputStream;
 import java.util.ArrayList;
@@ -71,7 +63,7 @@
         private final int numFrames;
         private final int minRecordSize;
         private final int maxRecordSize;
-        private TreeMap<Integer, String> result = new TreeMap<>();
+        private final TreeMap<Integer, String> result = new TreeMap<>();
         int maxFrameSize;
 
         ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
@@ -186,8 +178,8 @@
         prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
                 frameList, keyValueMapList);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
-                null, RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMapList);
     }
 
@@ -207,8 +199,8 @@
         prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
                 frameList, keyValueMapList);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
-                null, RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMapList);
     }
 
@@ -291,8 +283,8 @@
         prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
                 frameList, keyValueMap);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
-                null, RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMap);
     }
 
@@ -342,8 +334,8 @@
         for (GeneratedRunFileReader run : runGenerator.getRuns()) {
             runs.add(run);
         }
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
-                RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null, RecordDesc);
 
         IFrame outFrame = new VSizeFrame(ctx);
         reader.open();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
index f621bf9..b2a8323 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -19,13 +19,8 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SerDers;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.assertFTADataIsSorted;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*;
+import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -38,10 +33,13 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
@@ -126,12 +124,24 @@
     }
 
     @Test
-    public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
-        int topK = 1;
+    public void testHybridTopKWithOneNormalizedKey() throws HyracksDataException {
+        int topK = SORT_FRAME_LIMIT;
         IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
-                ComparatorFactories, RecordDesc);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields,
+                new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories,
+                RecordDesc);
+        testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+    }
 
+    @Test
+    public void testHybridTopKWithTwoNormalizedKeys() throws HyracksDataException {
+        int topK = SORT_FRAME_LIMIT;
+        IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(
+                ctx, SORT_FRAME_LIMIT, topK, SortFields, new INormalizedKeyComputerFactory[] {
+                        new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc);
+        testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
     }
 
     private void testInMemoryOnly(IHyracksTaskContext ctx, int topK, ORDER order, AbstractSortRunGenerator sorter)
@@ -148,7 +158,7 @@
 
         List<IFrame> frameList = new ArrayList<>();
         int minDataSize = PAGE_SIZE * NUM_PAGES * 4 / 5;
-        int minRecordSize = 16;
+        int minRecordSize = 64;
         int maxRecordSize = 64;
 
         AbstractRunGeneratorTest.prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null,
@@ -162,7 +172,6 @@
 
         doSort(sorter, buffer);
 
-        assertEquals(0, sorter.getRuns().size());
         validateResult(sorter, topKAnswer);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 650c60d..23a6be0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileSplit;
@@ -139,11 +140,11 @@
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
-        RecordDescriptor wordDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        RecordDescriptor wordDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
 
-        FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
-                new WordTupleParserFactory(), wordDesc);
+        FileScanOperatorDescriptor wordScanner =
+                new FileScanOperatorDescriptor(spec, splitsProvider, new WordTupleParserFactory(), wordDesc);
         createPartitionConstraint(spec, wordScanner, inSplits);
 
         RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -170,13 +171,16 @@
                             PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
         } else {
-            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
-                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
-            IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
-                    ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs,
-                            wordDesc)
-                    : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
-                            new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
+            IBinaryComparatorFactory[] cfs =
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+            IOperatorDescriptor sorter =
+                    "memsort".equalsIgnoreCase(algo)
+                            ? new InMemorySortOperatorDescriptor(spec, keys,
+                                    new INormalizedKeyComputerFactory[] {
+                                            new UTF8StringNormalizedKeyComputerFactory() },
+                                    cfs, wordDesc)
+                            : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
+                                    new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
             createPartitionConstraint(spec, sorter, outSplits);
 
             IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -195,9 +199,9 @@
         }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-        IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
-                ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
-                : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        IOperatorDescriptor writer =
+                "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+                        : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
         createPartitionConstraint(spec, writer, outSplits);
 
         IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index 8ab0708..7e56004 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -19,10 +19,7 @@
 
 package org.apache.hyracks.examples.tpch.client;
 
-import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
-import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
-import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc;
-import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
+import static org.apache.hyracks.examples.tpch.client.Common.*;
 
 import java.util.EnumSet;
 
@@ -31,6 +28,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -131,12 +129,12 @@
         createPartitionConstraint(spec, ordScanner, ordersSplits);
         AbstractSorterOperatorDescriptor sorter;
         if (usingHeapSorter && limit < Integer.MAX_VALUE) {
-            sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null,
-                    SortFieldsComparatorFactories, ordersDesc);
+            sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields,
+                    (INormalizedKeyComputerFactory) null, SortFieldsComparatorFactories, ordersDesc);
         } else {
             if (memBufferAlg.equalsIgnoreCase("bestfit")) {
-                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields,
-                        null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
+                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+                        SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
                         EnumFreeSlotPolicy.SMALLEST_FIT, limit);
             } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
                 sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
