diff --git a/hyracks/hyracks-dataflow-std/.classpath b/hyracks/hyracks-dataflow-std/.classpath
index 1f3c1ff..2c131fd 100644
--- a/hyracks/hyracks-dataflow-std/.classpath
+++ b/hyracks/hyracks-dataflow-std/.classpath
@@ -1,7 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="src" path="src/test/java"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
 	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
+	<classpathentry kind="output" path="target/android-classes"/>
 </classpath>
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index 620aec8..168f34f 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -38,5 +38,12 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>junit</groupId>
+  		<artifactId>junit</artifactId>
+  		<version>4.8.2</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 4c0e715..6075151 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -41,6 +42,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -49,20 +51,27 @@
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String IN_FRAMES = "inFrames";
-    private static final String TPOINTERS = "tPointers";
+    private static final String FRAMESORTER = "framesorter";
     private static final String RUNS = "runs";
 
     private static final long serialVersionUID = 1L;
     private final int[] sortFields;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    private final IBinaryComparatorFactory[] comparatorFactories;
     private final int framesLimit;
 
     public ExternalSortOperatorDescriptor(JobSpecification spec, int framesLimit, int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        this(spec, framesLimit, sortFields, null, comparatorFactories, recordDescriptor);
+    }
+
+    public ExternalSortOperatorDescriptor(JobSpecification spec, int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         this.sortFields = sortFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
         this.comparatorFactories = comparatorFactories;
         if (framesLimit <= 1) {
             throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
@@ -95,64 +104,41 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
+            final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
+                    comparatorFactories, recordDescriptors[0]);
+            final int maxSortFrames = framesLimit - 1;
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private final FrameTupleAccessor fta1 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                private final FrameTupleAccessor fta2 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                private List<ByteBuffer> inFrames;
-                private ByteBuffer outFrame;
                 private LinkedList<File> runs;
-                private int activeInFrame;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    inFrames = new ArrayList<ByteBuffer>();
-                    outFrame = ctx.getResourceManager().allocateFrame();
                     runs = new LinkedList<File>();
-                    activeInFrame = 0;
+                    frameSorter.reset();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    if (activeInFrame + 1 >= framesLimit) { // + 1 outFrame.
-                        try {
-                            createRunFromInFrames(inFrames.size());
-                        } catch (Exception e) {
-                            throw new HyracksDataException(e);
-                        }
+                    if (frameSorter.getFrameCount() >= maxSortFrames) {
+                        flushFramesToRun();
+                        frameSorter.reset();
                     }
-                    ByteBuffer copy;
-                    buffer.position(0);
-                    buffer.limit(buffer.capacity());
-                    if (runs.size() <= 0) {
-                        copy = ctx.getResourceManager().allocateFrame();
-                        copy.put(buffer);
-                        inFrames.add(copy);
-                    } else {
-                        copy = inFrames.get(activeInFrame);
-                        copy.put(buffer);
-                    }
-                    ++activeInFrame;
+                    frameSorter.insertFrame(buffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    env.set(IN_FRAMES, inFrames);
-                    env.set(RUNS, runs);
-                    if (activeInFrame > 0) {
+                    if (frameSorter.getFrameCount() > 0) {
                         if (runs.size() <= 0) {
-                            long[] tPointers = getSortedTPointers(activeInFrame);
-                            env.set(TPOINTERS, tPointers);
+                            frameSorter.sortFrames();
+                            env.set(FRAMESORTER, frameSorter);
                         } else {
-                            createRunFromInFrames(activeInFrame);
+                            flushFramesToRun();
                         }
                     }
+                    env.set(RUNS, runs);
                 }
 
-                private void createRunFromInFrames(int nBuffers) throws HyracksDataException {
+                private void flushFramesToRun() throws HyracksDataException {
                     File runFile;
                     try {
                         runFile = ctx.getResourceManager().createFile(
@@ -163,114 +149,12 @@
                     RunFileWriter writer = new RunFileWriter(runFile);
                     writer.open();
                     try {
-                        flushFrames(ctx, inFrames, outFrame, getSortedTPointers(nBuffers), writer);
+                        frameSorter.flushFrames(writer);
                     } finally {
                         writer.close();
                     }
+                    frameSorter.reset();
                     runs.add(runFile);
-                    activeInFrame = 0;
-                }
-
-                private long[] getSortedTPointers(int nBuffers) {
-                    FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                    int totalTCount = 0;
-                    for (int i = 0; i < nBuffers; ++i) {
-                        accessor.reset(inFrames.get(i));
-                        totalTCount += accessor.getTupleCount();
-                    }
-                    long[] tPointers = new long[totalTCount];
-                    int ptr = 0;
-                    for (int i = 0; i < nBuffers; ++i) {
-                        accessor.reset(inFrames.get(i));
-                        int tCount = accessor.getTupleCount();
-                        for (int j = 0; j < tCount; ++j) {
-                            tPointers[ptr++] = (((long) i) << 32) + j;
-                        }
-                    }
-                    if (tPointers.length > 0) {
-                        sort(tPointers, 0, tPointers.length);
-                    }
-                    return tPointers;
-                }
-
-                private void sort(long[] tPointers, int offset, int length) {
-                    int m = offset + (length >> 1);
-                    long v = tPointers[m];
-
-                    int a = offset;
-                    int b = a;
-                    int c = offset + length - 1;
-                    int d = c;
-                    while (true) {
-                        while (b <= c && compare(tPointers[b], v) <= 0) {
-                            if (compare(tPointers[b], v) == 0) {
-                                swap(tPointers, a++, b);
-                            }
-                            ++b;
-                        }
-                        while (c >= b && compare(tPointers[c], v) >= 0) {
-                            if (compare(tPointers[c], v) == 0) {
-                                swap(tPointers, c, d--);
-                            }
-                            --c;
-                        }
-                        if (b > c)
-                            break;
-                        swap(tPointers, b++, c--);
-                    }
-
-                    int s;
-                    int n = offset + length;
-                    s = Math.min(a - offset, b - a);
-                    vecswap(tPointers, offset, b - s, s);
-                    s = Math.min(d - c, n - d - 1);
-                    vecswap(tPointers, b, n - s, s);
-
-                    if ((s = b - a) > 1) {
-                        sort(tPointers, offset, s);
-                    }
-                    if ((s = d - c) > 1) {
-                        sort(tPointers, n - s, s);
-                    }
-                }
-
-                private void swap(long x[], int a, int b) {
-                    long t = x[a];
-                    x[a] = x[b];
-                    x[b] = t;
-                }
-
-                private void vecswap(long x[], int a, int b, int n) {
-                    for (int i = 0; i < n; i++, a++, b++) {
-                        swap(x, a, b);
-                    }
-                }
-
-                private int compare(long tp1, long tp2) {
-                    int i1 = (int) ((tp1 >> 32) & 0xffffffff);
-                    int j1 = (int) (tp1 & 0xffffffff);
-                    int i2 = (int) ((tp2 >> 32) & 0xffffffff);
-                    int j2 = (int) (tp2 & 0xffffffff);
-                    ByteBuffer buf1 = inFrames.get(i1);
-                    ByteBuffer buf2 = inFrames.get(i2);
-                    byte[] b1 = buf1.array();
-                    byte[] b2 = buf2.array();
-                    fta1.reset(buf1);
-                    fta2.reset(buf2);
-                    for (int f = 0; f < sortFields.length; ++f) {
-                        int fIdx = sortFields[f];
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
                 }
 
                 @Override
@@ -304,20 +188,21 @@
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    inFrames = (List<ByteBuffer>) env.get(IN_FRAMES);
-                    outFrame = ctx.getResourceManager().allocateFrame();
                     runs = (LinkedList<File>) env.get(RUNS);
-                    outFrameAppender = new FrameTupleAppender(ctx);
-                    outFrameAppender.reset(outFrame, true);
                     writer.open();
                     try {
                         if (runs.size() <= 0) {
-                            long[] tPointers = (long[]) env.get(TPOINTERS);
-                            if (tPointers != null) {
-                                flushFrames(ctx, inFrames, outFrame, tPointers, writer);
-                                env.set(TPOINTERS, null);
-                            }
+                            FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+                            frameSorter.flushFrames(writer);
+                            env.set(FRAMESORTER, null);
                         } else {
+                            inFrames = new ArrayList<ByteBuffer>();
+                            outFrame = ctx.getResourceManager().allocateFrame();
+                            outFrameAppender = new FrameTupleAppender(ctx);
+                            outFrameAppender.reset(outFrame, true);
+                            for (int i = 0; i < framesLimit - 1; ++i) {
+                                inFrames.add(ctx.getResourceManager().allocateFrame());
+                            }
                             int passCount = 0;
                             while (runs.size() > 0) {
                                 passCount++;
@@ -331,7 +216,6 @@
                     } finally {
                         writer.close();
                     }
-                    env.set(IN_FRAMES, null);
                     env.set(RUNS, null);
                 }
 
@@ -379,7 +263,7 @@
                             int tupleIndex = top.getTupleIndex();
 
                             if (!outFrameAppender.append(fta, tupleIndex)) {
-                                flushFrame(outFrame, writer);
+                                FrameUtils.flushFrame(outFrame, writer);
                                 outFrameAppender.reset(outFrame, true);
                                 if (!outFrameAppender.append(fta, tupleIndex)) {
                                     throw new IllegalStateException();
@@ -390,7 +274,7 @@
                             setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
                         }
                         if (outFrameAppender.getTupleCount() > 0) {
-                            flushFrame(outFrame, writer);
+                            FrameUtils.flushFrame(outFrame, writer);
                             outFrameAppender.reset(outFrame, true);
                         }
                         runs.subList(0, inFrames.size()).clear();
@@ -470,42 +354,6 @@
         };
     }
 
-    private void flushFrames(IHyracksContext ctx, List<ByteBuffer> inFrames, ByteBuffer outFrame, long[] tPointers,
-            IFrameWriter writer) throws HyracksDataException {
-        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-        FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx);
-        for (ByteBuffer buf : inFrames) {
-            buf.position(0);
-            buf.limit(buf.capacity());
-        }
-        outFrameAppender.reset(outFrame, true);
-        for (int ptr = 0; ptr < tPointers.length; ++ptr) {
-            long tp = tPointers[ptr];
-            int i = (int) ((tp >> 32) & 0xffffffff);
-            int j = (int) (tp & 0xffffffff);
-            ByteBuffer buffer = inFrames.get(i);
-            accessor.reset(buffer);
-            if (!outFrameAppender.append(accessor, j)) {
-                flushFrame(outFrame, writer);
-                outFrameAppender.reset(outFrame, true);
-                if (!outFrameAppender.append(accessor, j)) {
-                    throw new IllegalStateException();
-                }
-            }
-        }
-        if (outFrameAppender.getTupleCount() > 0) {
-            flushFrame(outFrame, writer);
-            outFrame.position(0);
-            outFrame.limit(outFrame.capacity());
-        }
-    }
-
-    private void flushFrame(ByteBuffer frame, IFrameWriter writer) throws HyracksDataException {
-        frame.position(0);
-        frame.limit(frame.capacity());
-        writer.nextFrame(frame);
-    }
-
     private class RunFileWriter implements IFrameWriter {
         private final File file;
         private FileChannel channel;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
new file mode 100644
index 0000000..a4f4999
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameSorter {
+    private final IHyracksContext ctx;
+    private final int[] sortFields;
+    private final INormalizedKeyComputer nkc;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDescriptor;
+    private final List<ByteBuffer> buffers;
+
+    private final FrameTupleAccessor fta1;
+    private final FrameTupleAccessor fta2;
+
+    private int dataFrameCount;
+    private int[] tPointers;
+
+    public FrameSorter(IHyracksContext ctx, int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        this.ctx = ctx;
+        this.sortFields = sortFields;
+        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.recordDescriptor = recordDescriptor;
+        buffers = new ArrayList<ByteBuffer>();
+        fta1 = new FrameTupleAccessor(ctx, recordDescriptor);
+        fta2 = new FrameTupleAccessor(ctx, recordDescriptor);
+
+        dataFrameCount = 0;
+    }
+
+    public void reset() {
+        dataFrameCount = 0;
+        tPointers = null;
+    }
+
+    public int getFrameCount() {
+        return dataFrameCount;
+    }
+
+    /**
+     * Gets the sorted tuple pointers.
+     * A tuple is "pointed" to by 4 entries in the tPointers array.
+     * [0] = Frame index in the "Frames" list.
+     * [1] = Start offset of the tuple in the frame
+     * [2] = Length of the tuple
+     * [3] = Poor man's normalized key for the tuple.
+     */
+    public int[] getTPointers() {
+        return tPointers;
+    }
+
+    public List<ByteBuffer> getFrames() {
+        return buffers;
+    }
+
+    public void insertFrame(ByteBuffer buffer) {
+        ByteBuffer copyFrame;
+        if (dataFrameCount == buffers.size()) {
+            copyFrame = ctx.getResourceManager().allocateFrame();
+            buffers.add(copyFrame);
+        } else {
+            copyFrame = buffers.get(dataFrameCount);
+        }
+        FrameUtils.copy(buffer, copyFrame);
+        ++dataFrameCount;
+    }
+
+    public void sortFrames() {
+        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+        int nBuffers = buffers.size();
+        int totalTCount = 0;
+        for (int i = 0; i < nBuffers; ++i) {
+            accessor.reset(buffers.get(i));
+            totalTCount += accessor.getTupleCount();
+        }
+        int sfIdx = sortFields[0];
+        tPointers = new int[totalTCount * 4];
+        int ptr = 0;
+        for (int i = 0; i < nBuffers; ++i) {
+            accessor.reset(buffers.get(i));
+            int tCount = accessor.getTupleCount();
+            byte[] array = accessor.getBuffer().array();
+            for (int j = 0; j < tCount; ++j) {
+                int tStart = accessor.getTupleStartOffset(j);
+                int tEnd = accessor.getTupleEndOffset(j);
+                tPointers[ptr * 4] = i;
+                tPointers[ptr * 4 + 1] = tStart;
+                tPointers[ptr * 4 + 2] = tEnd;
+                int f0StartRel = accessor.getFieldStartOffset(j, sfIdx);
+                int f0EndRel = accessor.getFieldEndOffset(j, sfIdx);
+                int f0Start = f0StartRel + tStart + accessor.getFieldSlotsLength();
+                tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
+                ++ptr;
+            }
+        }
+        if (tPointers.length > 0) {
+            sort(tPointers, 0, totalTCount);
+        }
+    }
+
+    public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+        FrameTupleAppender appender = new FrameTupleAppender(ctx);
+        ByteBuffer outFrame = ctx.getResourceManager().allocateFrame();
+        writer.open();
+        appender.reset(outFrame, true);
+        int n = tPointers.length / 4;
+        for (int ptr = 0; ptr < n; ++ptr) {
+            int i = tPointers[ptr * 4];
+            int tStart = tPointers[ptr * 4 + 1];
+            int tEnd = tPointers[ptr * 4 + 2];
+            ByteBuffer buffer = buffers.get(i);
+            accessor.reset(buffer);
+            if (!appender.append(accessor, tStart, tEnd)) {
+                FrameUtils.flushFrame(outFrame, writer);
+                appender.reset(outFrame, true);
+                if (!appender.append(accessor, tStart, tEnd)) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(outFrame, writer);
+        }
+    }
+
+    private void sort(int[] tPointers, int offset, int length) {
+        int m = offset + (length >> 1);
+        int mi = tPointers[m * 4];
+        int mj = tPointers[m * 4 + 1];
+        int mv = tPointers[m * 4 + 3];
+
+        int a = offset;
+        int b = a;
+        int c = offset + length - 1;
+        int d = c;
+        while (true) {
+            while (b <= c) {
+                int cmp = compare(tPointers, b, mi, mj, mv);
+                if (cmp > 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, a++, b);
+                }
+                ++b;
+            }
+            while (c >= b) {
+                int cmp = compare(tPointers, c, mi, mj, mv);
+                if (cmp < 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, c, d--);
+                }
+                --c;
+            }
+            if (b > c)
+                break;
+            swap(tPointers, b++, c--);
+        }
+
+        int s;
+        int n = offset + length;
+        s = Math.min(a - offset, b - a);
+        vecswap(tPointers, offset, b - s, s);
+        s = Math.min(d - c, n - d - 1);
+        vecswap(tPointers, b, n - s, s);
+
+        if ((s = b - a) > 1) {
+            sort(tPointers, offset, s);
+        }
+        if ((s = d - c) > 1) {
+            sort(tPointers, n - s, s);
+        }
+    }
+
+    private void swap(int x[], int a, int b) {
+        for (int i = 0; i < 4; ++i) {
+            int t = x[a * 4 + i];
+            x[a * 4 + i] = x[b * 4 + i];
+            x[b * 4 + i] = t;
+        }
+    }
+
+    private void vecswap(int x[], int a, int b, int n) {
+        for (int i = 0; i < n; i++, a++, b++) {
+            swap(x, a, b);
+        }
+    }
+
+    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
+        int i1 = tPointers[tp1 * 4];
+        int j1 = tPointers[tp1 * 4 + 1];
+        int v1 = tPointers[tp1 * 4 + 3];
+        if (v1 != tp2v) {
+            return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
+        }
+        int i2 = tp2i;
+        int j2 = tp2j;
+        ByteBuffer buf1 = buffers.get(i1);
+        ByteBuffer buf2 = buffers.get(i2);
+        byte[] b1 = buf1.array();
+        byte[] b2 = buf2.array();
+        fta1.reset(buf1);
+        fta2.reset(buf2);
+        for (int f = 0; f < sortFields.length; ++f) {
+            int fIdx = sortFields[f];
+            int f1Start = fIdx == 0 ? 0 : buf1.getShort(j1 + (fIdx - 1) * 2);
+            int f1End = buf1.getShort(j1 + fIdx * 2);
+            int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+            int l1 = f1End - f1Start;
+            int f2Start = fIdx == 0 ? 0 : buf2.getShort(j2 + (fIdx - 1) * 2);
+            int f2End = buf2.getShort(j2 + fIdx * 2);
+            int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
+            int l2 = f2End - f2Start;
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 8664fdd..7f6baa7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -15,40 +15,42 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String BUFFERS = "buffers";
-    private static final String TPOINTERS = "tpointers";
+    private static final String FRAMESORTER = "framesorter";
 
     private static final long serialVersionUID = 1L;
     private final int[] sortFields;
+    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
     private IBinaryComparatorFactory[] comparatorFactories;
 
     public InMemorySortOperatorDescriptor(JobSpecification spec, int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        this(spec, sortFields, null, comparatorFactories, recordDescriptor);
+    }
+
+    public InMemorySortOperatorDescriptor(JobSpecification spec, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.sortFields = sortFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
         this.comparatorFactories = comparatorFactories;
         recordDescriptors[0] = recordDescriptor;
     }
@@ -78,131 +80,23 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
+            final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
+                    comparatorFactories, recordDescriptors[0]);
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private List<ByteBuffer> buffers;
-
-                private final FrameTupleAccessor fta1 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                private final FrameTupleAccessor fta2 = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-
                 @Override
                 public void open() throws HyracksDataException {
-                    buffers = new ArrayList<ByteBuffer>();
-                    env.set(BUFFERS, buffers);
+                    frameSorter.reset();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copy = ctx.getResourceManager().allocateFrame();
-                    FrameUtils.copy(buffer, copy);
-                    buffers.add(copy);
+                    frameSorter.insertFrame(buffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                    int nBuffers = buffers.size();
-                    int totalTCount = 0;
-                    for (int i = 0; i < nBuffers; ++i) {
-                        accessor.reset(buffers.get(i));
-                        totalTCount += accessor.getTupleCount();
-                    }
-                    long[] tPointers = new long[totalTCount];
-                    int ptr = 0;
-                    for (int i = 0; i < nBuffers; ++i) {
-                        accessor.reset(buffers.get(i));
-                        int tCount = accessor.getTupleCount();
-                        for (int j = 0; j < tCount; ++j) {
-                            tPointers[ptr++] = (((long) i) << 32) + j;
-                        }
-                    }
-                    if (tPointers.length > 0) {
-                        sort(tPointers, 0, tPointers.length);
-                    }
-                    env.set(TPOINTERS, tPointers);
-                }
-
-                private void sort(long[] tPointers, int offset, int length) {
-                    int m = offset + (length >> 1);
-                    long v = tPointers[m];
-
-                    int a = offset;
-                    int b = a;
-                    int c = offset + length - 1;
-                    int d = c;
-                    while (true) {
-                        while (b <= c && compare(tPointers[b], v) <= 0) {
-                            if (compare(tPointers[b], v) == 0) {
-                                swap(tPointers, a++, b);
-                            }
-                            ++b;
-                        }
-                        while (c >= b && compare(tPointers[c], v) >= 0) {
-                            if (compare(tPointers[c], v) == 0) {
-                                swap(tPointers, c, d--);
-                            }
-                            --c;
-                        }
-                        if (b > c)
-                            break;
-                        swap(tPointers, b++, c--);
-                    }
-
-                    int s;
-                    int n = offset + length;
-                    s = Math.min(a - offset, b - a);
-                    vecswap(tPointers, offset, b - s, s);
-                    s = Math.min(d - c, n - d - 1);
-                    vecswap(tPointers, b, n - s, s);
-
-                    if ((s = b - a) > 1) {
-                        sort(tPointers, offset, s);
-                    }
-                    if ((s = d - c) > 1) {
-                        sort(tPointers, n - s, s);
-                    }
-                }
-
-                private void swap(long x[], int a, int b) {
-                    long t = x[a];
-                    x[a] = x[b];
-                    x[b] = t;
-                }
-
-                private void vecswap(long x[], int a, int b, int n) {
-                    for (int i = 0; i < n; i++, a++, b++) {
-                        swap(x, a, b);
-                    }
-                }
-
-                private int compare(long tp1, long tp2) {
-                    int i1 = (int) ((tp1 >> 32) & 0xffffffff);
-                    int j1 = (int) (tp1 & 0xffffffff);
-                    int i2 = (int) ((tp2 >> 32) & 0xffffffff);
-                    int j2 = (int) (tp2 & 0xffffffff);
-                    ByteBuffer buf1 = buffers.get(i1);
-                    ByteBuffer buf2 = buffers.get(i2);
-                    byte[] b1 = buf1.array();
-                    byte[] b2 = buf2.array();
-                    fta1.reset(buf1);
-                    fta2.reset(buf2);
-                    for (int f = 0; f < sortFields.length; ++f) {
-                        int fIdx = sortFields[f];
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
+                    frameSorter.sortFrames();
+                    env.set(FRAMESORTER, frameSorter);
                 }
 
                 @Override
@@ -227,39 +121,11 @@
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    List<ByteBuffer> buffers = (List<ByteBuffer>) env.get(BUFFERS);
-                    long[] tPointers = (long[]) env.get(TPOINTERS);
-                    FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                    FrameTupleAppender appender = new FrameTupleAppender(ctx);
-                    ByteBuffer outFrame = ctx.getResourceManager().allocateFrame();
                     writer.open();
-                    appender.reset(outFrame, true);
-                    for (int ptr = 0; ptr < tPointers.length; ++ptr) {
-                        long tp = tPointers[ptr];
-                        int i = (int) ((tp >> 32) & 0xffffffff);
-                        int j = (int) (tp & 0xffffffff);
-                        ByteBuffer buffer = buffers.get(i);
-                        accessor.reset(buffer);
-                        if (!appender.append(accessor, j)) {
-                            flushFrame(outFrame);
-                            appender.reset(outFrame, true);
-                            if (!appender.append(accessor, j)) {
-                                throw new IllegalStateException();
-                            }
-                        }
-                    }
-                    if (appender.getTupleCount() > 0) {
-                        flushFrame(outFrame);
-                    }
+                    FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+                    frameSorter.flushFrames(writer);
                     writer.close();
-                    env.set(BUFFERS, null);
-                    env.set(TPOINTERS, null);
-                }
-
-                private void flushFrame(ByteBuffer frame) throws HyracksDataException {
-                    frame.position(0);
-                    frame.limit(frame.capacity());
-                    writer.nextFrame(frame);
+                    env.set(FRAMESORTER, null);
                 }
             };
             return op;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SelectionTree.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SelectionTree.java
new file mode 100644
index 0000000..2a288cc
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SelectionTree.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.util;
+
+import java.util.BitSet;
+
+public class SelectionTree {
+    private int size;
+
+    private Entry[] entries;
+
+    private int[] losers;
+
+    private BitSet available;
+
+    public SelectionTree(Entry[] e) {
+        size = (e.length + 1) & 0xfffffffe;
+        available = new BitSet(size);
+        available.set(0, e.length, true);
+        losers = new int[size];
+        entries = e;
+        for (int i = 0; i < size; ++i) {
+            losers[i] = -1;
+        }
+        for (int i = 0; i < size; ++i) {
+            int slot = (size + i) >> 1;
+
+            if (i < entries.length) {
+                available.set(i, entries[i].advance());
+            }
+            int currIdx = i;
+            while (slot > 0) {
+                int cmp = 0;
+                if (losers[slot] < 0 || currIdx < 0) {
+                    cmp = losers[slot] < 0 ? -1 : 1;
+                } else if (!available.get(losers[slot])) {
+                    cmp = 1;
+                } else if (available.get(currIdx)) {
+                    if (currIdx <= i) {
+                        cmp = entries[losers[slot]].compareTo(entries[currIdx]);
+                    } else {
+                        cmp = 1;
+                    }
+                }
+
+                if (cmp <= 0) {
+                    int tmp = losers[slot];
+                    losers[slot] = currIdx;
+                    currIdx = tmp;
+                }
+                slot >>= 1;
+            }
+            losers[0] = currIdx;
+        }
+    }
+
+    public Entry peek() {
+        if (entries.length == 0) {
+            return null;
+        }
+        return entries[losers[0]];
+    }
+
+    public void pop() {
+        int winner = losers[0];
+        int slot = (size + winner) >> 1;
+
+        boolean avail = entries[winner].advance();
+        if (!avail) {
+            entries[winner] = null;
+        }
+        available.set(winner, avail);
+        int currIdx = winner;
+        while (!available.isEmpty() && slot > 0) {
+            int cmp = 0;
+            if (!available.get(losers[slot])) {
+                cmp = 1;
+            } else if (available.get(currIdx)) {
+                cmp = entries[losers[slot]].compareTo(entries[currIdx]);
+            }
+
+            if (cmp <= 0) {
+                int tmp = losers[slot];
+                losers[slot] = currIdx;
+                currIdx = tmp;
+            }
+            slot >>= 1;
+        }
+        losers[0] = currIdx;
+    }
+
+    public interface Entry extends Comparable<Entry> {
+        public abstract boolean advance();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/test/util/SelectionTreeTest.java b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/test/util/SelectionTreeTest.java
new file mode 100644
index 0000000..dc5b599
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/edu/uci/ics/hyracks/dataflow/std/test/util/SelectionTreeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.test.util;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.dataflow.std.util.SelectionTree;
+import edu.uci.ics.hyracks.dataflow.std.util.SelectionTree.Entry;
+
+public class SelectionTreeTest {
+    @Test
+    public void sortMergeTest() {
+        SelectionTree.Entry[] entries = new SelectionTree.Entry[5];
+        for (int i = 0; i < entries.length; ++i) {
+            entries[i] = new MergeEntry(0, i * entries.length + i, entries.length);
+        }
+        SelectionTree tree = new SelectionTree(entries);
+        SelectionTree.Entry e;
+        int last = Integer.MIN_VALUE;
+        while ((e = tree.peek()) != null) {
+            MergeEntry me = (MergeEntry) e;
+            System.err.print(me.i + " ");
+            if (me.i < last) {
+                Assert.fail();
+            }
+            last = me.i;
+            tree.pop();
+        }
+    }
+
+    private static class MergeEntry implements SelectionTree.Entry {
+        private int i;
+        private int max;
+        private int step;
+
+        public MergeEntry(int min, int max, int step) {
+            this.max = max;
+            this.step = step;
+            i = min - step;
+        }
+
+        @Override
+        public int compareTo(Entry o) {
+            return i < ((MergeEntry) o).i ? -1 : (i == ((MergeEntry) o).i ? 0 : 1);
+        }
+
+        @Override
+        public boolean advance() {
+            if (i > max) {
+                return false;
+            }
+            i += step;
+            return true;
+        }
+    }
+}
