Merge remote-tracking branch 'origin' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 9955a63..3f1aae2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -44,9 +44,11 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -142,27 +144,32 @@
         int n = orderColumns.size();
         int[] sortFields = new int[n];
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[n];
-        {
-            int j = 0;
-            for (OrderColumn oc : orderColumns) {
-                LogicalVariable var = oc.getColumn();
-                sortFields[j] = opSchema.findVariable(var);
-                Object type = env.getVarType(var);
-                IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-                comparatorFactories[j] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
-                j++;
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        int j = 0;
+        for (OrderColumn oc : orderColumns) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[j] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comparatorFactories[j] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            if (j == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, oc.getOrder() == OrderKind.ASC);
             }
+            j++;
         }
 
         IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields,
-                comparatorFactories);
+                comparatorFactories, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
-    
+
     public List<LogicalVariable> getPartitionFields() {
         return partitionFields;
     }
-    
+
     public List<OrderColumn> getOrderColumns() {
         return orderColumns;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index dc345ac..b03b99d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -39,9 +39,11 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -128,6 +130,10 @@
         IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
         IBinaryHashFunctionFactory[] hashFuns = new IBinaryHashFunctionFactory[n];
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
         for (int i = 0; i < n; i++) {
             sortFields[i] = opSchema.findVariable(sortColumns[i].getColumn());
             Object type = env.getVarType(sortColumns[i].getColumn());
@@ -135,9 +141,12 @@
             comps[i] = bcfp.getBinaryComparatorFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
             IBinaryHashFunctionFactoryProvider bhffp = context.getBinaryHashFunctionFactoryProvider();
             hashFuns[i] = bhffp.getBinaryHashFunctionFactory(type);
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, sortColumns[i].getOrder() == OrderKind.ASC);
+            }
         }
         ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(sortFields, hashFuns);
-        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps);
+        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.ONE);
     }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index b299e78..131eea0 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -24,7 +24,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
 
 public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -52,12 +52,12 @@
 
         return new AbstractOneInputOneOutputPushRuntime() {
 
-            FrameSorter frameSorter = null;
+            FrameSorterMergeSort frameSorter = null;
 
             @Override
             public void open() throws HyracksDataException {
                 if (frameSorter == null) {
-                    frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                    frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
                             outputRecordDesc);
                 }
                 frameSorter.reset();
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 04b91b2..31cd29b 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
 
@@ -53,8 +54,8 @@
     private final MarshalledWritable<Configuration> config;
     private final IInputSplitProviderFactory factory;
 
-    public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId, MarshalledWritable<Configuration> config,
-            IInputSplitProviderFactory factory) throws HyracksDataException {
+    public MapperOperatorDescriptor(IOperatorDescriptorRegistry spec, int jobId,
+            MarshalledWritable<Configuration> config, IInputSplitProviderFactory factory) throws HyracksDataException {
         super(spec, 0, 1);
         this.jobId = jobId;
         this.config = config;
@@ -94,7 +95,7 @@
 
             public void initBlock(int blockId) throws HyracksDataException {
                 runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
-                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit);
+                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), Algorithm.MERGE_SORT, framesLimit);
                 this.blockId = blockId;
             }
 
@@ -114,7 +115,8 @@
                     runGen.nextFrame(frame);
                     fta.reset(frame, true);
                     if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + frame.capacity() + ")");
+                        throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+                                + frame.capacity() + ")");
                     }
                 }
             }
@@ -224,7 +226,7 @@
                     comparators[i] = comparatorFactories[i].createBinaryComparator();
                 }
                 ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
-                        runGen.getRuns(), new int[] { 0 }, comparators,
+                        runGen.getRuns(), new int[] { 0 }, comparators, null,
                         helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
                 merger.process();
             }
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 30ba3ec..3bb78f9 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -108,7 +108,7 @@
             runs.add(rfw.createReader());
         }
         RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
-        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
                 recordDescriptor, framesLimit, rfw);
         merger.process();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
index 235b0d0..2dda9cc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergeFrameReader.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
@@ -31,18 +32,21 @@
     private final int nSenders;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDescriptor;
     private final IPartitionBatchManager pbm;
 
     private RunMergingFrameReader merger;
 
     public SortMergeFrameReader(IHyracksTaskContext ctx, int maxConcurrentMerges, int nSenders, int[] sortFields,
-            IBinaryComparator[] comparators, RecordDescriptor recordDescriptor, IPartitionBatchManager pbm) {
+            IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDescriptor,
+            IPartitionBatchManager pbm) {
         this.ctx = ctx;
         this.maxConcurrentMerges = maxConcurrentMerges;
         this.nSenders = nSenders;
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDescriptor = recordDescriptor;
         this.pbm = pbm;
     }
@@ -57,7 +61,7 @@
             List<IFrameReader> batch = new ArrayList<IFrameReader>();
             pbm.getNextBatch(batch, nSenders);
             merger = new RunMergingFrameReader(ctx, batch.toArray(new IFrameReader[nSenders]), inFrames, sortFields,
-                    comparators, recordDescriptor);
+                    comparators, nmkComputer, recordDescriptor);
         } else {
             // multi level merge.
             throw new HyracksDataException("Not yet supported");
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 32269bd..1cf402b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -39,19 +41,23 @@
     private final ITuplePartitionComputerFactory tpcf;
     private final int[] sortFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory nkcFactory;
     private final boolean stable;
 
-    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
-        this(spec, tpcf, sortFields, comparatorFactories, false);
+    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory nkcFactory) {
+        this(spec, tpcf, sortFields, comparatorFactories, nkcFactory, false);
     }
 
-    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, boolean stable) {
+    public MToNPartitioningMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory nkcFactory, boolean stable) {
         super(spec);
         this.tpcf = tpcf;
         this.sortFields = sortFields;
         this.comparatorFactories = comparatorFactories;
+        this.nkcFactory = nkcFactory;
         this.stable = stable;
     }
 
@@ -71,9 +77,10 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
+        INormalizedKeyComputer nmkComputer = nkcFactory == null ? null : nkcFactory.createNormalizedKeyComputer();
         IPartitionBatchManager pbm = new NonDeterministicPartitionBatchManager(nProducerPartitions);
         IFrameReader sortMergeFrameReader = new SortMergeFrameReader(ctx, nProducerPartitions, nProducerPartitions,
-                sortFields, comparators, recordDesc, pbm);
+                sortFields, comparators, nmkComputer, recordDesc, pbm);
         BitSet expectedPartitions = new BitSet();
         expectedPartitions.set(0, nProducerPartitions);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sortMergeFrameReader, pbm);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 21b3b60..bb3d0f3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -25,6 +25,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -47,6 +49,7 @@
     private final Object stateId;
     private final int[] keyFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final AggregateState aggregateState;
     private final ArrayTupleBuilder tupleBuilder;
     private final int[] storedKeys;
@@ -76,7 +79,7 @@
     private final FrameTupleAccessor outFrameAccessor;
 
     ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
-            IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nmkFactory, int[] keyFields,
             IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         this.stateId = stateId;
@@ -85,6 +88,7 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
+        this.nmkComputer = nmkFactory == null ? null : nmkFactory.createNormalizedKeyComputer();
         int[] keyFieldsInPartialResults = new int[keyFields.length];
         for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
             keyFieldsInPartialResults[i] = i;
@@ -181,7 +185,7 @@
             FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
             Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
             ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
-                    runNumber, comparator);
+                    runNumber, comparator, keyFields, nmkComputer);
             /**
              * current tuple index in each run
              */
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index b914702..9296ffa 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -130,8 +130,8 @@
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
             return new ExternalGroupMergeOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
-                    AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, keyFields, mergerFactory, isOutputSorted,
-                    framesLimit, recordDescriptors[0]);
+                    AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, firstNormalizerFactory, keyFields,
+                    mergerFactory, isOutputSorted, framesLimit, recordDescriptors[0]);
         }
 
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Algorithm.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Algorithm.java
new file mode 100644
index 0000000..5c96bd0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Algorithm.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+public enum Algorithm {
+    QUICK_SORT,
+    MERGE_SORT
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index f80a82c..e1315e7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -51,6 +52,15 @@
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final int framesLimit;
 
+    private Algorithm alg = Algorithm.MERGE_SORT;
+
+    public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, Algorithm alg) {
+        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+        this.alg = alg;
+    }
+
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
         this(spec, framesLimit, sortFields, null, comparatorFactories, recordDescriptor);
@@ -86,7 +96,7 @@
 
     public static class SortTaskState extends AbstractStateObject {
         private List<IFrameReader> runs;
-        private FrameSorter frameSorter;
+        private IFrameSorter frameSorter;
 
         public SortTaskState() {
         }
@@ -122,7 +132,7 @@
                 @Override
                 public void open() throws HyracksDataException {
                     runGen = new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
-                            comparatorFactories, recordDescriptors[0], framesLimit);
+                            comparatorFactories, recordDescriptors[0], alg, framesLimit);
                     runGen.open();
                 }
 
@@ -166,14 +176,16 @@
                     SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             SORT_ACTIVITY_ID), partition));
                     List<IFrameReader> runs = state.runs;
-                    FrameSorter frameSorter = state.frameSorter;
+                    IFrameSorter frameSorter = state.frameSorter;
                     IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
                     for (int i = 0; i < comparatorFactories.length; ++i) {
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
+                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
+                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
                     int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
-                            comparators, recordDescriptors[0], necessaryFrames, writer);
+                            comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
                     merger.process();
                 }
             };
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index b149e30..3736fca 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -30,15 +30,21 @@
 
 public class ExternalSortRunGenerator implements IFrameWriter {
     private final IHyracksTaskContext ctx;
-    private final FrameSorter frameSorter;
+    private final IFrameSorter frameSorter;
     private final List<IFrameReader> runs;
     private final int maxSortFrames;
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDesc, int framesLimit) throws HyracksDataException {
+            RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
         this.ctx = ctx;
-        frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
+        if (alg == Algorithm.MERGE_SORT) {
+            frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                    recordDesc);
+        } else {
+            frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                    recordDesc);
+        }
         runs = new LinkedList<IFrameReader>();
         maxSortFrames = framesLimit - 1;
     }
@@ -87,7 +93,7 @@
     public void fail() throws HyracksDataException {
     }
 
-    public FrameSorter getFrameSorter() {
+    public IFrameSorter getFrameSorter() {
         return frameSorter;
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 510dfd6..eaf4162 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -59,6 +60,7 @@
     private final List<IFrameReader> runs;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final int framesLimit;
     private final IFrameWriter writer;
@@ -66,8 +68,8 @@
     private ByteBuffer outFrame;
     private FrameTupleAppender outFrameAppender;
 
-    private FrameSorter frameSorter; // Used in External sort, no replacement
-                                     // selection
+    private IFrameSorter frameSorter; // Used in External sort, no replacement
+                                      // selection
     private FrameTupleAccessor outFrameAccessor; // Used in External sort, with
                                                  // replacement selection
     private final int outputLimit; // Used in External sort, with replacement
@@ -76,14 +78,15 @@
                              // selection and limit on output size
 
     // Constructor for external sort, no replacement selection
-    public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
-            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
-            IFrameWriter writer) {
+    public ExternalSortRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
+            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+            RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
         this.ctx = ctx;
         this.frameSorter = frameSorter;
         this.runs = new LinkedList<IFrameReader>(runs);
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
@@ -92,11 +95,13 @@
 
     // Constructor for external sort with replacement selection
     public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
-            IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
+            IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
+            int framesLimit, IFrameWriter writer) {
         this.ctx = ctx;
         this.runs = new LinkedList<IFrameReader>(runs);
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
@@ -162,7 +167,7 @@
 
     private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
         RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
-                recordDesc);
+                nmkComputer, recordDesc);
         merger.open();
         try {
             while (merger.nextFrame(outFrame)) {
@@ -263,7 +268,7 @@
                 runCursors[i] = runs.get(i);
             }
             RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
-                    comparators, recordDesc);
+                    comparators, nmkComputer, recordDesc);
             merger.open();
             try {
                 while (merger.nextFrame(outFrame)) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
similarity index 97%
rename from hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
rename to hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index a6bb4e2..cc0f1ef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -31,7 +31,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 
-public class FrameSorter {
+public class FrameSorterMergeSort implements IFrameSorter {
     private final IHyracksTaskContext ctx;
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
@@ -50,7 +50,7 @@
     private int[] tPointersTemp;
     private int tupleCount;
 
-    public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) throws HyracksDataException {
         this.ctx = ctx;
@@ -69,15 +69,18 @@
         dataFrameCount = 0;
     }
 
+    @Override
     public void reset() {
         dataFrameCount = 0;
         tupleCount = 0;
     }
 
+    @Override
     public int getFrameCount() {
         return dataFrameCount;
     }
 
+    @Override
     public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
         ByteBuffer copyFrame;
         if (dataFrameCount == buffers.size()) {
@@ -90,6 +93,7 @@
         ++dataFrameCount;
     }
 
+    @Override
     public void sortFrames() {
         int nBuffers = dataFrameCount;
         tupleCount = 0;
@@ -123,6 +127,7 @@
         }
     }
 
+    @Override
     public void flushFrames(IFrameWriter writer) throws HyracksDataException {
         appender.reset(outFrame, true);
         for (int ptr = 0; ptr < tupleCount; ++ptr) {
@@ -241,6 +246,7 @@
         return 0;
     }
 
+    @Override
     public void close() {
         this.buffers.clear();
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
similarity index 71%
copy from hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
copy to hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index a6bb4e2..083f4a7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -29,9 +29,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 
-public class FrameSorter {
+public class FrameSorterQuickSort implements IFrameSorter {
     private final IHyracksTaskContext ctx;
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
@@ -47,10 +46,9 @@
 
     private int dataFrameCount;
     private int[] tPointers;
-    private int[] tPointersTemp;
     private int tupleCount;
 
-    public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) throws HyracksDataException {
         this.ctx = ctx;
@@ -69,15 +67,18 @@
         dataFrameCount = 0;
     }
 
+    @Override
     public void reset() {
         dataFrameCount = 0;
         tupleCount = 0;
     }
 
+    @Override
     public int getFrameCount() {
         return dataFrameCount;
     }
 
+    @Override
     public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
         ByteBuffer copyFrame;
         if (dataFrameCount == buffers.size()) {
@@ -90,6 +91,7 @@
         ++dataFrameCount;
     }
 
+    @Override
     public void sortFrames() {
         int nBuffers = dataFrameCount;
         tupleCount = 0;
@@ -118,11 +120,11 @@
             }
         }
         if (tupleCount > 0) {
-            tPointersTemp = new int[tPointers.length];
-            sort(0, tupleCount);
+            sort(tPointers, 0, tupleCount);
         }
     }
 
+    @Override
     public void flushFrames(IFrameWriter writer) throws HyracksDataException {
         appender.reset(outFrame, true);
         for (int ptr = 0; ptr < tupleCount; ++ptr) {
@@ -145,73 +147,75 @@
         }
     }
 
-    private void sort(int offset, int length) {
-        int step = 1;
-        int len = length;
-        int end = offset + len;
-        /** bottom-up merge */
-        while (step < len) {
-            /** merge */
-            for (int i = offset; i < end; i += 2 * step) {
-                int next = i + step;
-                if (next < end) {
-                    merge(i, next, step, Math.min(step, end - next));
-                } else {
-                    System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4);
+    private void sort(int[] tPointers, int offset, int length) {
+        int m = offset + (length >> 1);
+        int mi = tPointers[m * 4];
+        int mj = tPointers[m * 4 + 1];
+        int mv = tPointers[m * 4 + 3];
+
+        int a = offset;
+        int b = a;
+        int c = offset + length - 1;
+        int d = c;
+        while (true) {
+            while (b <= c) {
+                int cmp = compare(tPointers, b, mi, mj, mv);
+                if (cmp > 0) {
+                    break;
                 }
+                if (cmp == 0) {
+                    swap(tPointers, a++, b);
+                }
+                ++b;
             }
-            /** prepare next phase merge */
-            step *= 2;
-            int[] tmp = tPointersTemp;
-            tPointersTemp = tPointers;
-            tPointers = tmp;
-        }
-    }
-
-    /** Merge two subarrays into one */
-    private void merge(int start1, int start2, int len1, int len2) {
-        int targetPos = start1;
-        int pos1 = start1;
-        int pos2 = start2;
-        int end1 = start1 + len1 - 1;
-        int end2 = start2 + len2 - 1;
-        while (pos1 <= end1 && pos2 <= end2) {
-            int cmp = compare(pos1, pos2);
-            if (cmp <= 0) {
-                copy(pos1, targetPos);
-                pos1++;
-            } else {
-                copy(pos2, targetPos);
-                pos2++;
+            while (c >= b) {
+                int cmp = compare(tPointers, c, mi, mj, mv);
+                if (cmp < 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, c, d--);
+                }
+                --c;
             }
-            targetPos++;
+            if (b > c)
+                break;
+            swap(tPointers, b++, c--);
         }
-        if (pos1 <= end1) {
-            int rest = end1 - pos1 + 1;
-            System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4);
+
+        int s;
+        int n = offset + length;
+        s = Math.min(a - offset, b - a);
+        vecswap(tPointers, offset, b - s, s);
+        s = Math.min(d - c, n - d - 1);
+        vecswap(tPointers, b, n - s, s);
+
+        if ((s = b - a) > 1) {
+            sort(tPointers, offset, s);
         }
-        if (pos2 <= end2) {
-            int rest = end2 - pos2 + 1;
-            System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4);
+        if ((s = d - c) > 1) {
+            sort(tPointers, n - s, s);
         }
     }
 
-    private void copy(int src, int dest) {
-        tPointersTemp[dest * 4] = tPointers[src * 4];
-        tPointersTemp[dest * 4 + 1] = tPointers[src * 4 + 1];
-        tPointersTemp[dest * 4 + 2] = tPointers[src * 4 + 2];
-        tPointersTemp[dest * 4 + 3] = tPointers[src * 4 + 3];
+    private void swap(int x[], int a, int b) {
+        for (int i = 0; i < 4; ++i) {
+            int t = x[a * 4 + i];
+            x[a * 4 + i] = x[b * 4 + i];
+            x[b * 4 + i] = t;
+        }
     }
 
-    private int compare(int tp1, int tp2) {
+    private void vecswap(int x[], int a, int b, int n) {
+        for (int i = 0; i < n; i++, a++, b++) {
+            swap(x, a, b);
+        }
+    }
+
+    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
         int i1 = tPointers[tp1 * 4];
         int j1 = tPointers[tp1 * 4 + 1];
         int v1 = tPointers[tp1 * 4 + 3];
-
-        int tp2i = tPointers[tp2 * 4];
-        int tp2j = tPointers[tp2 * 4 + 1];
-        int tp2v = tPointers[tp2 * 4 + 3];
-
         if (v1 != tp2v) {
             return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
         }
@@ -225,12 +229,12 @@
         fta2.reset(buf2);
         for (int f = 0; f < comparators.length; ++f) {
             int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf1.array(), j1 + (fIdx - 1) * 4);
-            int f1End = IntSerDeUtils.getInt(buf1.array(), j1 + fIdx * 4);
+            int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
+            int f1End = buf1.getInt(j1 + fIdx * 4);
             int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
             int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf2.array(), j2 + (fIdx - 1) * 4);
-            int f2End = IntSerDeUtils.getInt(buf2.array(), j2 + fIdx * 4);
+            int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
+            int f2End = buf2.getInt(j2 + fIdx * 4);
             int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
             int l2 = f2End - f2Start;
             int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
@@ -241,7 +245,8 @@
         return 0;
     }
 
+    @Override
     public void close() {
         this.buffers.clear();
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
new file mode 100644
index 0000000..6778852
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameSorter {
+
+    public void reset();
+
+    public int getFrameCount();
+
+    public void insertFrame(ByteBuffer buffer) throws HyracksDataException;
+
+    public void sortFrames();
+
+    public void flushFrames(IFrameWriter writer) throws HyracksDataException;
+
+    public void close();
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 04c82af..6fa21b5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -77,7 +77,7 @@
     }
 
     public static class SortTaskState extends AbstractStateObject {
-        private FrameSorter frameSorter;
+        private FrameSorterMergeSort frameSorter;
 
         public SortTaskState() {
         }
@@ -111,7 +111,7 @@
                 @Override
                 public void open() throws HyracksDataException {
                     state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
-                    state.frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
+                    state.frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
                             comparatorFactories, recordDescriptors[0]);
                     state.frameSorter.reset();
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index bff2e21..ef1ae88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -201,9 +202,11 @@
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
 
+                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
+                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
                     int necessaryFrames = Math.min(runs.size() + 2, memSize);
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
-                            comparators, recordDescriptors[0], necessaryFrames, writer);
+                            comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
 
                     merger.processWithReplacementSelection();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index 16b3c12..00fbe9b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -35,6 +36,7 @@
     private final List<ByteBuffer> inFrames;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
+    private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final FrameTupleAppender outFrameAppender;
     private ReferencedPriorityQueue topTuples;
@@ -42,12 +44,14 @@
     private FrameTupleAccessor[] tupleAccessors;
 
     public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
-            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc) {
+            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+            RecordDescriptor recordDesc) {
         this.ctx = ctx;
         this.runCursors = runCursors;
         this.inFrames = inFrames;
         this.sortFields = sortFields;
         this.comparators = comparators;
+        this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
     }
@@ -56,7 +60,8 @@
     public void open() throws HyracksDataException {
         tupleAccessors = new FrameTupleAccessor[runCursors.length];
         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator,
+                sortFields, nmkComputer);
         tupleIndexes = new int[runCursors.length];
         for (int i = 0; i < runCursors.length; i++) {
             tupleIndexes[i] = 0;
@@ -143,21 +148,25 @@
     private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
         return new Comparator<ReferenceEntry>() {
             public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+                int nmk1 = tp1.getNormalizedKey();
+                int nmk2 = tp1.getNormalizedKey();
+                if (nmk1 > nmk2) {
+                    return 1;
+                }
+                if (nmk1 < nmk2) {
+                    return -1;
+                }
+
                 FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
                 FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-                int j1 = tp1.getTupleIndex();
-                int j2 = tp2.getTupleIndex();
                 byte[] b1 = fta1.getBuffer().array();
                 byte[] b2 = fta2.getBuffer().array();
+                int[] tPointers1 = tp1.getTPointers();
+                int[] tPointers2 = tp2.getTPointers();
+
                 for (int f = 0; f < sortFields.length; ++f) {
-                    int fIdx = sortFields[f];
-                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                            + fta1.getFieldStartOffset(j1, fIdx);
-                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                            + fta2.getFieldStartOffset(j2, fIdx);
-                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    int c = comparators[f].compare(b1, tPointers1[2 * f + 1], tPointers1[2 * f + 2], b2,
+                            tPointers2[2 * f + 1], tPointers2[2 * f + 2]);
                     if (c != 0) {
                         return c;
                     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
index c1efa91..c06b50c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -14,18 +14,24 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.util;
 
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class ReferenceEntry {
     private final int runid;
     private FrameTupleAccessor acccessor;
     private int tupleIndex;
+    private int[] tPointers;
 
-    public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex) {
+    public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+            INormalizedKeyComputer nmkComputer) {
         super();
         this.runid = runid;
         this.acccessor = fta;
-        this.tupleIndex = tupleIndex;
+        this.tPointers = new int[1 + 2 * keyFields.length];
+        if (fta != null) {
+            initTPointer(fta, tupleIndex, keyFields, nmkComputer);
+        }
     }
 
     public int getRunid() {
@@ -40,11 +46,38 @@
         this.acccessor = fta;
     }
 
+    public int[] getTPointers() {
+        return tPointers;
+    }
+
     public int getTupleIndex() {
         return tupleIndex;
     }
 
-    public void setTupleIndex(int tupleIndex) {
+    public int getNormalizedKey() {
+        return tPointers[0];
+    }
+
+    public void setTupleIndex(int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) {
+        initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
+    }
+
+    private void initTPointer(FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
+            INormalizedKeyComputer nmkComputer) {
         this.tupleIndex = tupleIndex;
+        byte[] b1 = fta.getBuffer().array();
+        for (int f = 0; f < keyFields.length; ++f) {
+            int fIdx = keyFields[f];
+            tPointers[2 * f + 1] = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength()
+                    + fta.getFieldStartOffset(tupleIndex, fIdx);
+            tPointers[2 * f + 2] = fta.getFieldEndOffset(tupleIndex, fIdx) - fta.getFieldStartOffset(tupleIndex, fIdx);
+            if (f == 0) {
+                if (nmkComputer != null) {
+                    tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]);
+                } else {
+                    tPointers[0] = 0;
+                }
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
index 7767ace..225f583 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/ReferencedPriorityQueue.java
@@ -18,6 +18,7 @@
 import java.util.BitSet;
 import java.util.Comparator;
 
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
@@ -30,21 +31,25 @@
     private int nItems;
 
     private final Comparator<ReferenceEntry> comparator;
+    private final INormalizedKeyComputer nmkComputer;
+    private final int[] keyFields;
 
     public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
-            Comparator<ReferenceEntry> comparator) {
+            Comparator<ReferenceEntry> comparator, int[] keyFields, INormalizedKeyComputer nmkComputer) {
         this.frameSize = frameSize;
         this.recordDescriptor = recordDescriptor;
         if (initSize < 1)
             throw new IllegalArgumentException();
         this.comparator = comparator;
+        this.nmkComputer = nmkComputer;
+        this.keyFields = keyFields;
         nItems = initSize;
         size = (initSize + 1) & 0xfffffffe;
         entries = new ReferenceEntry[size];
         runAvail = new BitSet(size);
         runAvail.set(0, initSize, true);
         for (int i = 0; i < size; i++) {
-            entries[i] = new ReferenceEntry(i, null, -1);
+            entries[i] = new ReferenceEntry(i, null, -1, keyFields, nmkComputer);
         }
     }
 
@@ -71,7 +76,7 @@
             entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
         }
         entry.getAccessor().reset(fta.getBuffer());
-        entry.setTupleIndex(tIndex);
+        entry.setTupleIndex(tIndex, keyFields, nmkComputer);
 
         add(entry);
         return entry.getRunid();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index 380ba29..a2ef99a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -93,8 +94,8 @@
                         PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
                         new IBinaryComparatorFactory[] {
                                 PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
-                printer, 0);
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
     }
@@ -148,8 +149,8 @@
                         PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
                         new IBinaryComparatorFactory[] {
                                 PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0, filter,
-                0);
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, filter, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), filter, 0, printer, 0);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 48db176..faa55e8 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,7 +90,8 @@
                         new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }), new int[] { 1 },
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                                .of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
+                                .of(UTF8StringPointable.FACTORY) }, new UTF8StringNormalizedKeyComputerFactory()),
+                sorter, 0, printer, 0);
 
         runTest(spec);
     }
@@ -138,8 +140,8 @@
                         PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
                         new IBinaryComparatorFactory[] {
                                 PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
-                printer, 0);
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
     }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
index c61c15b..61c5a1c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/dataflow/DataflowTest.java
@@ -156,8 +156,8 @@
         jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
         jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
                 new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
-                0, writeOperator, 0);
+                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+                sortOperator, 0, writeOperator, 0);
         jobSpec.addRoot(writeOperator);
 
         IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
index 3e43fb1..baa4dc7 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -163,8 +163,8 @@
         jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0);
         jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory(
                 new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }), sortOperator,
-                0, writeOperator, 0);
+                new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null),
+                sortOperator, 0, writeOperator, 0);
         jobSpec.addRoot(writeOperator);
 
         IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST,
diff --git a/pregelix/pregelix-benchmark/pom.xml b/pregelix/pregelix-benchmark/pom.xml
index 4d7d456..c1344ea 100644
--- a/pregelix/pregelix-benchmark/pom.xml
+++ b/pregelix/pregelix-benchmark/pom.xml
@@ -12,6 +12,42 @@
 	<artifactId>pregelix-benchmark</artifactId>
 	<name>pregelix-benchmark</name>
 	<url>http://maven.apache.org</url>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
+				</configuration>
+			</plugin>
+
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.2-beta-5</version>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-my-jar-with-dependencies</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+
 	<dependencies>
 		<dependency>
 			<groupId>junit</groupId>
@@ -26,6 +62,12 @@
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.giraph</groupId>
+			<artifactId>giraph-examples</artifactId>
+			<version>1.0.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>hyracks-hdfs-core</artifactId>
 			<version>0.2.10-SNAPSHOT</version>
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
deleted file mode 100644
index 04c29de..0000000
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.benchmark;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.VLongWritable;
-
-/**
- * Demonstrates the basic Pregel PageRank implementation.
- */
-public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
-
-    public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
-    private final DoubleWritable vertexValue = new DoubleWritable();
-    private final DoubleWritable msg = new DoubleWritable();
-    private int maxIteration = -1;
-
-    @Override
-    public void compute(Iterable<DoubleWritable> msgIterator) {
-        if (maxIteration < 0) {
-            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
-        }
-        if (getSuperstep() == 1) {
-            vertexValue.set(1.0 / getTotalNumVertices());
-        }
-        if (getSuperstep() >= 2 && getSuperstep() <= maxIteration) {
-            double sum = 0;
-            for (DoubleWritable msg : msgIterator) {
-                sum += msg.get();
-            }
-            vertexValue.set((0.15 / getTotalNumVertices()) + 0.85 * sum);
-        }
-
-        if (getSuperstep() >= 1 && getSuperstep() < maxIteration) {
-            long edges = getNumEdges();
-            msg.set(vertexValue.get() / edges);
-            sendMessageToAllEdges(msg);
-        } else {
-            voteToHalt();
-        }
-    }
-
-}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java
similarity index 60%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java
index 3d85f66..b290907 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java
@@ -12,7 +12,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.benchmark;
+
+package edu.uci.ics.pregelix.benchmark.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -24,14 +25,13 @@
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.MapMutableEdge;
 import org.apache.giraph.io.formats.TextVertexInputFormat;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextCCInputFormat extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
 
     @Override
     public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,25 +39,25 @@
             String[] items;
 
             @Override
-            protected VLongWritable getId(Text line) throws IOException {
+            protected LongWritable getId(Text line) throws IOException {
                 items = line.toString().split(" ");
-                return new VLongWritable(Long.parseLong(items[0]));
+                return new LongWritable(Long.parseLong(items[0]));
             }
 
             @Override
-            protected DoubleWritable getValue(Text line) throws IOException {
+            protected LongWritable getValue(Text line) throws IOException {
                 return null;
             }
 
             @Override
-            protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
-                List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
-                Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+            protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+                List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+                Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
                 for (int i = 1; i < items.length; i++) {
-                    edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+                    edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
                 }
-                for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
-                    MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+                for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
                     edge.setEntry(entry);
                     edge.setValue(null);
                     edges.add(edge);
@@ -67,4 +67,5 @@
 
         };
     }
+
 }
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java
new file mode 100644
index 0000000..770c6e1
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextCCOutputFormat extends TextVertexOutputFormat<LongWritable, LongWritable, NullWritable> {
+
+    @Override
+    public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new TextVertexWriterToEachLine() {
+
+            @Override
+            protected Text convertVertexToLine(Vertex<LongWritable, LongWritable, NullWritable, ?> vertex)
+                    throws IOException {
+                return new Text(vertex.getId() + " " + vertex.getValue());
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPRInputFormat.java
similarity index 64%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPRInputFormat.java
index 3d85f66..38eef3a 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPRInputFormat.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.benchmark;
+package edu.uci.ics.pregelix.benchmark.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,13 +25,13 @@
 import org.apache.giraph.edge.MapMutableEdge;
 import org.apache.giraph.io.formats.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextPRInputFormat extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable> {
 
     @Override
     public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,9 +39,9 @@
             String[] items;
 
             @Override
-            protected VLongWritable getId(Text line) throws IOException {
+            protected LongWritable getId(Text line) throws IOException {
                 items = line.toString().split(" ");
-                return new VLongWritable(Long.parseLong(items[0]));
+                return new LongWritable(Long.parseLong(items[0]));
             }
 
             @Override
@@ -50,14 +50,14 @@
             }
 
             @Override
-            protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
-                List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
-                Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+            protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+                List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+                Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
                 for (int i = 1; i < items.length; i++) {
-                    edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+                    edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
                 }
-                for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
-                    MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+                for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
                     edge.setEntry(entry);
                     edge.setValue(null);
                     edges.add(edge);
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPROutputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPROutputFormat.java
new file mode 100644
index 0000000..b14de6f
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPROutputFormat.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextPROutputFormat extends TextVertexOutputFormat<LongWritable, DoubleWritable, NullWritable> {
+
+    @Override
+    public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new TextVertexWriterToEachLine() {
+
+            @Override
+            protected Text convertVertexToLine(Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex)
+                    throws IOException {
+                return new Text(vertex.getId() + " " + vertex.getValue());
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextSPInputFormat.java
similarity index 65%
rename from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
rename to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextSPInputFormat.java
index 3d85f66..953e93c 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextSPInputFormat.java
@@ -12,7 +12,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.benchmark;
+
+package edu.uci.ics.pregelix.benchmark.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,13 +26,12 @@
 import org.apache.giraph.edge.MapMutableEdge;
 import org.apache.giraph.io.formats.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextSPInputFormat extends TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
 
     @Override
     public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,9 +39,9 @@
             String[] items;
 
             @Override
-            protected VLongWritable getId(Text line) throws IOException {
+            protected LongWritable getId(Text line) throws IOException {
                 items = line.toString().split(" ");
-                return new VLongWritable(Long.parseLong(items[0]));
+                return new LongWritable(Long.parseLong(items[0]));
             }
 
             @Override
@@ -50,16 +50,16 @@
             }
 
             @Override
-            protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
-                List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
-                Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+            protected Iterable<Edge<LongWritable, DoubleWritable>> getEdges(Text line) throws IOException {
+                List<Edge<LongWritable, DoubleWritable>> edges = new ArrayList<Edge<LongWritable, DoubleWritable>>();
+                Map<LongWritable, DoubleWritable> edgeMap = new HashMap<LongWritable, DoubleWritable>();
                 for (int i = 1; i < items.length; i++) {
-                    edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+                    edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
                 }
-                for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
-                    MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+                for (Entry<LongWritable, DoubleWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<LongWritable, DoubleWritable> edge = new MapMutableEdge<LongWritable, DoubleWritable>();
                     edge.setEntry(entry);
-                    edge.setValue(null);
+                    edge.setValue(new DoubleWritable(1.0));
                     edges.add(edge);
                 }
                 return edges;
@@ -67,4 +67,5 @@
 
         };
     }
+
 }
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextCCInputFormat2.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextCCInputFormat2.java
new file mode 100644
index 0000000..0a70b3c
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextCCInputFormat2.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.io2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MapMutableEdge;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextCCInputFormat2 extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
+
+    @Override
+    public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextVertexReaderFromEachLine() {
+            String[] items;
+
+            @Override
+            protected LongWritable getId(Text line) throws IOException {
+                String[] kv = line.toString().split("\t");
+                items = kv[1].split(" ");
+                return new LongWritable(Long.parseLong(kv[0]));
+            }
+
+            @Override
+            protected LongWritable getValue(Text line) throws IOException {
+                return null;
+            }
+
+            @Override
+            protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+                List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+                Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
+                for (int i = 1; i < items.length; i++) {
+                    edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
+                }
+                for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
+                    edge.setEntry(entry);
+                    edge.setValue(null);
+                    edges.add(edge);
+                }
+                return edges;
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextPRInputFormat2.java
similarity index 62%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextPRInputFormat2.java
index 3d85f66..63a4519 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextPRInputFormat2.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.benchmark;
+package edu.uci.ics.pregelix.benchmark.io2;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,13 +25,13 @@
 import org.apache.giraph.edge.MapMutableEdge;
 import org.apache.giraph.io.formats.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextPRInputFormat2 extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable> {
 
     @Override
     public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,9 +39,10 @@
             String[] items;
 
             @Override
-            protected VLongWritable getId(Text line) throws IOException {
-                items = line.toString().split(" ");
-                return new VLongWritable(Long.parseLong(items[0]));
+            protected LongWritable getId(Text line) throws IOException {
+                String[] kv = line.toString().split("\t");
+                items = kv[1].split(" ");
+                return new LongWritable(Long.parseLong(items[0]));
             }
 
             @Override
@@ -50,14 +51,14 @@
             }
 
             @Override
-            protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
-                List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
-                Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+            protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+                List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+                Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
                 for (int i = 1; i < items.length; i++) {
-                    edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+                    edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
                 }
-                for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
-                    MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+                for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
                     edge.setEntry(entry);
                     edge.setValue(null);
                     edges.add(edge);
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextSPInputFormat2.java
similarity index 63%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextSPInputFormat2.java
index 3d85f66..fdb1061 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextSPInputFormat2.java
@@ -12,7 +12,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.benchmark;
+
+package edu.uci.ics.pregelix.benchmark.io2;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,13 +26,12 @@
 import org.apache.giraph.edge.MapMutableEdge;
 import org.apache.giraph.io.formats.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextSPInputFormat2 extends TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
 
     @Override
     public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,9 +39,10 @@
             String[] items;
 
             @Override
-            protected VLongWritable getId(Text line) throws IOException {
-                items = line.toString().split(" ");
-                return new VLongWritable(Long.parseLong(items[0]));
+            protected LongWritable getId(Text line) throws IOException {
+                String[] kv = line.toString().split("\t");
+                items = kv[1].split(" ");
+                return new LongWritable(Long.parseLong(kv[0]));
             }
 
             @Override
@@ -50,16 +51,16 @@
             }
 
             @Override
-            protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
-                List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
-                Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+            protected Iterable<Edge<LongWritable, DoubleWritable>> getEdges(Text line) throws IOException {
+                List<Edge<LongWritable, DoubleWritable>> edges = new ArrayList<Edge<LongWritable, DoubleWritable>>();
+                Map<LongWritable, DoubleWritable> edgeMap = new HashMap<LongWritable, DoubleWritable>();
                 for (int i = 1; i < items.length; i++) {
-                    edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+                    edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
                 }
-                for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
-                    MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+                for (Entry<LongWritable, DoubleWritable> entry : edgeMap.entrySet()) {
+                    MapMutableEdge<LongWritable, DoubleWritable> edge = new MapMutableEdge<LongWritable, DoubleWritable>();
                     edge.setEntry(entry);
-                    edge.setValue(null);
+                    edge.setValue(new DoubleWritable(1.0));
                     edges.add(edge);
                 }
                 return edges;
@@ -67,4 +68,5 @@
 
         };
     }
+
 }
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ConnectedComponentsVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ConnectedComponentsVertex.java
new file mode 100644
index 0000000..3789d6d
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ConnectedComponentsVertex.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.vertex;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+public class ConnectedComponentsVertex extends Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
+    /**
+     * Propagates the smallest vertex id to all neighbors. Will always choose to
+     * halt and only reactivate if a smaller id has been sent to it.
+     * 
+     * @param messages
+     *            Iterator of messages from the previous superstep.
+     * @throws IOException
+     */
+    @Override
+    public void compute(Iterable<LongWritable> messages) throws IOException {
+        long currentComponent = getValue().get();
+
+        // First superstep is special, because we can simply look at the neighbors
+        if (getSuperstep() == 0) {
+            for (Edge<LongWritable, NullWritable> edge : getEdges()) {
+                long neighbor = edge.getTargetVertexId().get();
+                if (neighbor < currentComponent) {
+                    currentComponent = neighbor;
+                }
+            }
+            // Only need to send value if it is not the own id
+            if (currentComponent != getValue().get()) {
+                setValue(new LongWritable(currentComponent));
+                for (Edge<LongWritable, NullWritable> edge : getEdges()) {
+                    LongWritable neighbor = edge.getTargetVertexId();
+                    if (neighbor.get() > currentComponent) {
+                        sendMessage(neighbor, getValue());
+                    }
+                }
+            }
+
+            voteToHalt();
+            return;
+        }
+
+        boolean changed = false;
+        // did we get a smaller id ?
+        for (LongWritable message : messages) {
+            long candidateComponent = message.get();
+            if (candidateComponent < currentComponent) {
+                currentComponent = candidateComponent;
+                changed = true;
+            }
+        }
+
+        // propagate new component id to the neighbors
+        if (changed) {
+            setValue(new LongWritable(currentComponent));
+            sendMessageToAllEdges(getValue());
+        }
+        voteToHalt();
+    }
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/PageRankVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/PageRankVertex.java
new file mode 100644
index 0000000..86e90dd
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/PageRankVertex.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.vertex;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.examples.RandomWalkVertex;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * The PageRank algorithm, with uniform transition probabilities on the edges
+ * http://en.wikipedia.org/wiki/PageRank
+ */
+public class PageRankVertex extends RandomWalkVertex<NullWritable> {
+
+    @Override
+    protected double transitionProbability(double stateProbability, Edge<LongWritable, NullWritable> edge) {
+        return stateProbability / getNumEdges();
+    }
+
+    @Override
+    protected double recompute(Iterable<DoubleWritable> partialRanks, double teleportationProbability) {
+
+        // rank contribution from incident neighbors
+        double rankFromNeighbors = MathUtils.sum(partialRanks);
+        // rank contribution from dangling vertices
+        double danglingContribution = getDanglingProbability() / getTotalNumVertices();
+
+        // recompute rank
+        return (1d - teleportationProbability) * (rankFromNeighbors + danglingContribution) + teleportationProbability
+                / getTotalNumVertices();
+    }
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ShortestPathsVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ShortestPathsVertex.java
new file mode 100644
index 0000000..755a3d0
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ShortestPathsVertex.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.vertex;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Shortest paths algorithm.
+ */
+public class ShortestPathsVertex extends Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+    /** Source id. */
+    public static final String SOURCE_ID = "giraph.shortestPathsBenchmark.sourceId";
+    /** Default source id. */
+    public static final long SOURCE_ID_DEFAULT = 1;
+
+    private boolean isSource() {
+        return getId().get() == getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+    }
+
+    @Override
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+        if (getSuperstep() == 0) {
+            setValue(new DoubleWritable(Double.MAX_VALUE));
+        }
+
+        double minDist = isSource() ? 0d : Double.MAX_VALUE;
+        for (DoubleWritable message : messages) {
+            minDist = Math.min(minDist, message.get());
+        }
+
+        if (minDist < getValue().get()) {
+            setValue(new DoubleWritable(minDist));
+            for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
+                double distance = minDist + edge.getValue().get();
+                sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
+            }
+        }
+
+        voteToHalt();
+    }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..d89d577 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -63,6 +63,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
@@ -291,7 +292,7 @@
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), sorter, 0, writer, 0);
+                comparatorFactories, nkmFactory), sorter, 0, writer, 0);
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -355,10 +356,11 @@
          */
         int[] sortFields = new int[1];
         sortFields[0] = 0;
+        INormalizedKeyComputerFactory nkmFactory = JobGenUtil.getINormalizedKeyComputerFactory(conf);
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
-                comparatorFactories), scanner, 0, writer, 0);
+                comparatorFactories, nkmFactory), scanner, 0, writer, 0);
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -715,7 +717,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastCheckpointedIteration,
                 WritableComparator.get(vertexIdClass).getClass());
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, recordDescriptor);
+                nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, sort);
 
         /**
@@ -739,8 +741,7 @@
          * connect operator descriptors
          */
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
-                sort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9389f62..bbd3a7d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -195,7 +196,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
@@ -282,8 +283,8 @@
 
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
-                localGby, 0, globalGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+                nkmFactory), localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
@@ -401,7 +402,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
@@ -503,8 +504,8 @@
 
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
-                localGby, 0, globalGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+                nkmFactory), localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
@@ -601,7 +602,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(lastSuccessfulIteration,
                 WritableComparator.get(vertexIdClass).getClass());
         ExternalSortOperatorDescriptor sort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, recordDescriptor);
+                nkmFactory, sortCmpFactories, recordDescriptor, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, sort);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 287b797..0bccde5 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -138,7 +139,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
@@ -241,8 +242,8 @@
          * connect the group-by operator
          */
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
-                localGby, 0, globalGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+                nkmFactory), localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
@@ -338,7 +339,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
@@ -440,8 +441,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
-                localGby, 0, globalGby, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories,
+                nkmFactory), localGby, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 3b3c9e7..ae2621e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -140,7 +141,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
         /**
@@ -327,7 +328,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index e334095..7645cd6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -137,7 +138,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
@@ -153,7 +154,7 @@
          * construct global sort operator
          */
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
         /**
@@ -341,7 +342,7 @@
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
         ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
@@ -357,7 +358,7 @@
          * construct global sort operator
          */
         ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
-                nkmFactory, sortCmpFactories, rdUnnestedMessage);
+                nkmFactory, sortCmpFactories, rdUnnestedMessage, Algorithm.QUICK_SORT);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
         /**
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 4c7f91d..f599996 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -89,6 +91,7 @@
             UTF8StringPointable.FACTORY);
     private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
             UTF8StringPointable.FACTORY);
+    private INormalizedKeyComputerFactory nmkFactory = new UTF8StringNormalizedKeyComputerFactory();
 
     private void cleanupStores() throws IOException {
         FileUtils.forceMkdir(new File("teststore"));
@@ -211,7 +214,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
         IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory);
         spec.connect(joinWriterConn, sorter, 0, writer, 0);
 
         spec.addRoot(writer);
@@ -284,8 +288,8 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
-                sorter, 0, writer, 0);
+                sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory), sorter, 0, writer, 0);
 
         spec.addRoot(writer);
         runTest(spec);
@@ -368,11 +372,11 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
-                sorter, 0, join, 0);
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory), sorter, 0, join, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
-                join, 0, writer, 0);
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory), join, 0, writer, 0);
 
         spec.addRoot(writer);
         runTest(spec);
@@ -477,7 +481,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
         IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 9 },
-                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
+                nmkFactory);
         spec.connect(joinWriterConn, sorter, 0, writer, 0);
 
         spec.addRoot(writer);
@@ -573,7 +578,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
                 keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
-                        UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+                        UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories, nmkFactory), sorter, 0, join,
+                0);
 
         IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
         mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
@@ -581,7 +587,8 @@
         int[] mergeFields = new int[] { 9, 0 };
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
                 new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
-                        UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+                        UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories, nmkFactory), join, 0,
+                writer, 0);
 
         spec.addRoot(writer);
         runTest(spec);