fix the cache miss problem in the sort merge reader
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 9955a63..3f1aae2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -44,9 +44,11 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -142,27 +144,32 @@
         int n = orderColumns.size();
         int[] sortFields = new int[n];
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[n];
-        {
-            int j = 0;
-            for (OrderColumn oc : orderColumns) {
-                LogicalVariable var = oc.getColumn();
-                sortFields[j] = opSchema.findVariable(var);
-                Object type = env.getVarType(var);
-                IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-                comparatorFactories[j] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
-                j++;
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        int j = 0;
+        for (OrderColumn oc : orderColumns) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[j] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comparatorFactories[j] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            if (j == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, oc.getOrder() == OrderKind.ASC);
             }
+            j++;
         }
 
         IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields,
-                comparatorFactories);
+                comparatorFactories, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
-    
+
     public List<LogicalVariable> getPartitionFields() {
         return partitionFields;
     }
-    
+
     public List<OrderColumn> getOrderColumns() {
         return orderColumns;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index dc345ac..b03b99d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -39,9 +39,11 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -128,6 +130,10 @@
         IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
         IBinaryHashFunctionFactory[] hashFuns = new IBinaryHashFunctionFactory[n];
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
         for (int i = 0; i < n; i++) {
             sortFields[i] = opSchema.findVariable(sortColumns[i].getColumn());
             Object type = env.getVarType(sortColumns[i].getColumn());
@@ -135,9 +141,12 @@
             comps[i] = bcfp.getBinaryComparatorFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
             IBinaryHashFunctionFactoryProvider bhffp = context.getBinaryHashFunctionFactoryProvider();
             hashFuns[i] = bhffp.getBinaryHashFunctionFactory(type);
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+            }
         }
         ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(sortFields, hashFuns);
-        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps);
+        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 09eeab6..1a1d41e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -320,8 +320,10 @@
                 collector.close();
             }
         } catch (HyracksException e) {
+            e.printStackTrace();
             throw new HyracksDataException(e);
         } catch (Exception e) {
+            e.printStackTrace();
             throw new HyracksDataException(e);
         }
     }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
index 9faef09..22615d2 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -18,6 +18,9 @@
 public class IntSerDeUtils {
 
     public static int getInt(byte[] bytes, int offset) {
+        if (offset + 3 > bytes.length) {
+            System.out.println("offset " + offset + " " + bytes.length);
+        }
         return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 04b91b2..b8e9ec0 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -53,8 +53,8 @@
     private final MarshalledWritable<Configuration> config;
     private final IInputSplitProviderFactory factory;
 
-    public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId, MarshalledWritable<Configuration> config,
-            IInputSplitProviderFactory factory) throws HyracksDataException {
+    public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId,
+            MarshalledWritable<Configuration> config, IInputSplitProviderFactory factory) throws HyracksDataException {
         super(spec, 0, 1);
         this.jobId = jobId;
         this.config = config;
@@ -114,7 +114,8 @@
                     runGen.nextFrame(frame);
                     fta.reset(frame, true);
                     if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + frame.capacity() + ")");
+                        throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+                                + frame.capacity() + ")");
                     }
                 }
             }
@@ -224,7 +225,7 @@
                     comparators[i] = comparatorFactories[i].createBinaryComparator();
                 }
                 ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
-                        runGen.getRuns(), new int[] { 0 }, comparators,
+                        runGen.getRuns(), new int[] { 0 }, comparators, null,
                         helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
                 merger.process();
             }
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 30ba3ec..3bb78f9 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -108,7 +108,7 @@
             runs.add(rfw.createReader());
         }
         RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
-        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
                 recordDescriptor, framesLimit, rfw);
         merger.process();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
index 235b0d0..2dda9cc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
@@ -31,18 +32,21 @@
     private final int nSenders;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDescriptor;
     private final IPartitionBatchManager pbm;
 
     private RunMergingFrameReader merger;
 
     public SortMergeFrameReader(IHyracksTaskContext ctx, int maxConcurrentMerges, int nSenders, int[] sortFields,
-            IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, IPartitionBatchManager pbm) {
+            IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDescriptor,
+            IPartitionBatchManager pbm) {
         this.ctx = ctx;
         this.maxConcurrentMerges = maxConcurrentMerges;
         this.nSenders = nSenders;
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDescriptor = recordDescriptor;
         this.pbm = pbm;
     }
@@ -57,7 +61,7 @@
             List<IFrameReader> batch = new ArrayList<IFrameReader>();
             pbm.getNextBatch(batch, nSenders);
             merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
-                    comparators, recordDescriptor);
+                    comparators, nmkComputer, recordDescriptor);
         } else {
             // multi level merge.
             throw new HyracksDataException("Not yet supported");
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 32269bd..1cf402b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -39,19 +41,23 @@
     private final ITuplePartitionComputerFactory tpcf;
     private final int[] sortFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory nkcFactory;
     private final boolean stable;
 
-    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
-        this(spec, tpcf, sortFields, comparatorFactories, false);
+    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory nkcFactory) {
+        this(spec, tpcf, sortFields, comparatorFactories, nkcFactory, false);
     }
 
-    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, boolean stable) {
+    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory nkcFactory, boolean stable) {
         super(spec);
         this.tpcf = tpcf;
         this.sortFields = sortFields;
         this.comparatorFactories = comparatorFactories;
+        this.nkcFactory = nkcFactory;
         this.stable = stable;
     }
 
@@ -71,9 +77,10 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
+        INormalizedKeyComputer nmkComputer = nkcFactory == null ? null : nkcFactory.createNormalizedKeyComputer();
         IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
         IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
-                sortFields, comparators, recordDesc, pbm);
+                sortFields, comparators, nmkComputer, recordDesc, pbm);
         BitSet expectedPartitions = new BitSet();
         expectedPartitions.set(0, nProducerPartitions);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 21b3b60..bb3d0f3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -25,6 +25,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -47,6 +49,7 @@
     private final Object stateId;
     private final int[] keyFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final AggregateState aggregateState;
     private final ArrayTupleBuilder tupleBuilder;
     private final int[] storedKeys;
@@ -76,7 +79,7 @@
     private final FrameTupleAccessor outFrameAccessor;
 
     ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
-            IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nmkFactory, int[] keyFields,
             IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         this.stateId = stateId;
@@ -85,6 +88,7 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
+        this.nmkComputer = nmkFactory == null ? null : nmkFactory.createNormalizedKeyComputer();
         int[] keyFieldsInPartialResults = new int[keyFields.length];
         for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
             keyFieldsInPartialResults[i] = i;
@@ -181,7 +185,7 @@
             FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
             Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
             ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
-                    runNumber, comparator);
+                    runNumber, comparator, keyFields, nmkComputer);
             /**
              * current tuple index in each run
              */
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index b914702..9296ffa 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -130,8 +130,8 @@
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
             return new ExternalGroupMergeOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
-                    AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, keyFields, mergerFactory, isOutputSorted,
-                    framesLimit, recordDescriptors[0]);
+                    AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, firstNormalizerFactory, keyFields,
+                    mergerFactory, isOutputSorted, framesLimit, recordDescriptors[0]);
         }
 
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index f80a82c..f071b16 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -171,9 +172,11 @@
                     for (int i = 0; i < comparatorFactories.length; ++i) {
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
+                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
+                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
                     int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
-                            comparators, recordDescriptors[0], necessaryFrames, writer);
+                            comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
                     merger.process();
                 }
             };
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 510dfd6..77efe89 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -59,6 +60,7 @@
     private final List<IFrameReader> runs;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final int framesLimit;
     private final IFrameWriter writer;
@@ -77,13 +79,14 @@
 
     // Constructor for external sort, no replacement selection
     public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
-            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
-            IFrameWriter writer) {
+            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+            RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
         this.ctx = ctx;
         this.frameSorter = frameSorter;
         this.runs = new LinkedList<IFrameReader>(runs);
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
@@ -92,11 +95,13 @@
 
     // Constructor for external sort with replacement selection
     public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
-            IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
+            IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
+            int framesLimit, IFrameWriter writer) {
         this.ctx = ctx;
         this.runs = new LinkedList<IFrameReader>(runs);
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
@@ -162,7 +167,7 @@
 
     private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
         RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
-                recordDesc);
+                nmkComputer, recordDesc);
         merger.open();
         try {
             while (merger.nextFrame(outFrame)) {
@@ -263,7 +268,7 @@
                 runCursors[i] = runs.get(i);
             }
             RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
-                    comparators, recordDesc);
+                    comparators, nmkComputer, recordDesc);
             merger.open();
             try {
                 while (merger.nextFrame(outFrame)) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index bff2e21..ef1ae88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -201,9 +202,11 @@
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
 
+                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
+                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
                     int necessaryFrames = Math.min(runs.size() + 2, memSize);
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
-                            comparators, recordDescriptors[0], necessaryFrames, writer);
+                            comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
 
                     merger.processWithReplacementSelection();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index 16b3c12..00fbe9b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -35,6 +36,7 @@
     private final List<ByteBuffer> inFrames;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final FrameTupleAppender outFrameAppender;
     private ReferencedPriorityQueue topTuples;
@@ -42,12 +44,14 @@
     private FrameTupleAccessor[] tupleAccessors;
 
     public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
-            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc) {
+            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+            RecordDescriptor recordDesc) {
         this.ctx = ctx;
         this.runCursors = runCursors;
         this.inFrames = inFrames;
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
     }
@@ -56,7 +60,8 @@
     public void open() throws HyracksDataException {
         tupleAccessors = new FrameTupleAccessor[runCursors.length];
         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator,
+                sortFields, nmkComputer);
         tupleIndexes = new int[runCursors.length];
         for (int i = 0; i < runCursors.length; i++) {
             tupleIndexes[i] = 0;
@@ -143,21 +148,25 @@
     private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
         return new Comparator<ReferenceEntry>() {
             public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+                int nmk1 = tp1.getNormalizedKey();
+                int nmk2 = tp1.getNormalizedKey();
+                if (nmk1 > nmk2) {
+                    return 1;
+                }
+                if (nmk1 < nmk2) {
+                    return -1;
+                }
+
                 FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
                 FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-                int j1 = tp1.getTupleIndex();
-                int j2 = tp2.getTupleIndex();
                 byte[] b1 = fta1.getBuffer().array();
                 byte[] b2 = fta2.getBuffer().array();
+                int[] tPointers1 = tp1.getTPointers();
+                int[] tPointers2 = tp2.getTPointers();
+
                 for (int f = 0; f < sortFields.length; ++f) {
-                    int fIdx = sortFields[f];
-                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                            + fta1.getFieldStartOffset(j1, fIdx);
-                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                            + fta2.getFieldStartOffset(j2, fIdx);
-                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    int c = comparators[f].compare(b1, tPointers1[2 * f + 1], tPointers1[2 * f + 2], b2,
+                            tPointers2[2 * f + 1], tPointers2[2 * f + 2]);
                     if (c != 0) {
                         return c;
                     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
index c1efa91..c049c8b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -14,18 +14,24 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.util;
 
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class ReferenceEntry {
     private final int runid;
     private FrameTupleAccessor acccessor;
     private int tupleIndex;
+    private int[] tPointers;
 
-    public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex) {
+    public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+            INormalizedKeyComputer nmkComputer) {
         super();
         this.runid = runid;
         this.acccessor = fta;
-        this.tupleIndex = tupleIndex;
+        this.tPointers = new int[1 + 2 * keyFields.length];
+        if (fta != null) {
+            initTPointer(fta, tupleIndex, keyFields, nmkComputer);
+        }
     }
 
     public int getRunid() {
@@ -40,11 +46,38 @@
         this.acccessor = fta;
     }
 
+    public int[] getTPointers() {
+        return tPointers;
+    }
+
     public int getTupleIndex() {
         return tupleIndex;
     }
 
-    public void setTupleIndex(int tupleIndex) {
+    public int getNormalizedKey() {
+        return tPointers[0];
+    }
+
+    public void setTupleIndex(int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) {
         this.tupleIndex = tupleIndex;
+        initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
+    }
+
+    private void initTPointer(FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+            INormalizedKeyComputer nmkComputer) {
+        byte[] b1 = fta.getBuffer().array();
+        for (int f = 0; f < keyFields.length; ++f) {
+            int fIdx = keyFields[f];
+            tPointers[2 * f + 1] = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength()
+                    + fta.getFieldStartOffset(tupleIndex, fIdx);
+            tPointers[2 * f + 2] = fta.getFieldEndOffset(tupleIndex, fIdx) - fta.getFieldStartOffset(tupleIndex, fIdx);
+            if (f == 0) {
+                if (nmkComputer != null) {
+                    tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]);
+                } else {
+                    tPointers[0] = 0;
+                }
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
index 7767ace..225f583 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
@@ -18,6 +18,7 @@
 import java.util.BitSet;
 import java.util.Comparator;
 
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
@@ -30,21 +31,25 @@
     private int nItems;
 
     private final Comparator<ReferenceEntry> comparator;
+    private final INormalizedKeyComputer nmkComputer;
+    private final int[] keyFields;
 
     public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
-            Comparator<ReferenceEntry> comparator) {
+            Comparator<ReferenceEntry> comparator, int[] keyFields, INormalizedKeyComputer nmkComputer) {
         this.frameSize = frameSize;
         this.recordDescriptor = recordDescriptor;
         if (initSize < 1)
             throw new IllegalArgumentException();
         this.comparator = comparator;
+        this.nmkComputer = nmkComputer;
+        this.keyFields = keyFields;
         nItems = initSize;
         size = (initSize + 1) & 0xfffffffe;
         entries = new ReferenceEntry[size];
         runAvail = new BitSet(size);
         runAvail.set(0, initSize, true);
         for (int i = 0; i < size; i++) {
-            entries[i] = new ReferenceEntry(i, null, -1);
+            entries[i] = new ReferenceEntry(i, null, -1, keyFields, nmkComputer);
         }
     }
 
@@ -71,7 +76,7 @@
             entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
         }
         entry.getAccessor().reset(fta.getBuffer());
-        entry.setTupleIndex(tIndex);
+        entry.setTupleIndex(tIndex, keyFields, nmkComputer);
 
         add(entry);
         return entry.getRunid();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index 380ba29..a2ef99a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -93,8 +94,8 @@
                         PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
                         new IBinaryComparatorFactory[] {
                                 PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
-                printer, 0);
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
     }
@@ -148,8 +149,8 @@
                         PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
                         new IBinaryComparatorFactory[] {
                                 PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0, filter,
-                0);
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 48db176..faa55e8 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,7 +90,8 @@
                         new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }), new int[] { 1 },
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                                .of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
+                                .of(UTF8StringPointable.FACTORY) }, new UTF8StringNormalizedKeyComputerFactory()),
+                sorter, 0, printer, 0);
 
         runTest(spec);
     }
@@ -138,8 +140,8 @@
                         PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
                         new IBinaryComparatorFactory[] {
                                 PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
-                printer, 0);
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
     }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
index c61c15b..61c5a1c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
@@ -156,8 +156,8 @@
         jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
         jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
                 new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
-                0, writeOperator, 0);
+                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+                sortOperator, 0, writeOperator, 0);
         jobSpec.addRoot(writeOperator);
 
         IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 3e43fb1..baa4dc7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -163,8 +163,8 @@
         jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
         jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
                 new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
-                0, writeOperator, 0);
+                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+                sortOperator, 0, writeOperator, 0);
         jobSpec.addRoot(writeOperator);
 
         IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a188f64 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -291,7 +291,7 @@
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), sorter, 0, writer, 0);
+                comparatorFactories, nkmFactory), sorter, 0, writer, 0);
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -355,10 +355,11 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), scanner, 0, writer, 0);
+                comparatorFactories, nkmFactory), scanner, 0, writer, 0);
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -739,8 +740,7 @@
          * connect operator descriptors
          */
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
-                sort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9389f62..195e595 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -282,7 +282,7 @@
 
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
                 localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -503,7 +503,7 @@
 
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories, nkmFactory),
                 localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 287b797..cc90f2c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -241,8 +241,8 @@
          * connect the group-by operator
          */
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
-                localGby, 0, globalGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+                nkmFactory), localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
@@ -440,8 +440,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
-                localGby, 0, globalGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+                nkmFactory), localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 4c7f91d..f599996 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,6 +91,7 @@
             UTF8StringPointable.FACTORY);
     private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
             UTF8StringPointable.FACTORY);
+    private INormalizedKeyComputerFactory nmkFactory = new UTF8StringNormalizedKeyComputerFactory();
 
     private void cleanupStores() throws IOException {
         FileUtils.forceMkdir(new File("teststore"));
@@ -211,7 +214,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
         IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory);
         spec.connect(joinWriterConn, sorter, 0, writer, 0);
 
         spec.addRoot(writer);
@@ -284,8 +288,8 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
-                sorter, 0, writer, 0);
+                sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory), sorter, 0, writer, 0);
 
         spec.addRoot(writer);
         runTest(spec);
@@ -368,11 +372,11 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
-                sorter, 0, join, 0);
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory), sorter, 0, join, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
-                join, 0, writer, 0);
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory), join, 0, writer, 0);
 
         spec.addRoot(writer);
         runTest(spec);
@@ -477,7 +481,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
         IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 9 },
-                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory);
         spec.connect(joinWriterConn, sorter, 0, writer, 0);
 
         spec.addRoot(writer);
@@ -573,7 +578,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
                 keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
-                        UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+                        UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories, nmkFactory), sorter, 0, join,
+                0);
 
         IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
         mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
@@ -581,7 +587,8 @@
         int[] mergeFields = new int[] { 9, 0 };
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
                 new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
-                        UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+                        UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories, nmkFactory), join, 0,
+                writer, 0);
 
         spec.addRoot(writer);
         runTest(spec);