passed kmer groupby test
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index e91dd66..b36b82d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -121,7 +121,7 @@
                         throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
                     }
                     tupleBuilder.reset();
-                    tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
+                    tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
                     pos.set(readID, (byte) posInRead);
                     tupleBuilder.addField(pos.getByteArray(), pos.getStartOffset(), pos.getLength());
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index bfc5ae5..4006fb4 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -47,6 +47,7 @@
 
             @Override
             public AggregateState createAggregateStates() {
+                System.out.println("CreateState");
                 return new AggregateState(new ArrayBackedValueStorage());
             }
 
@@ -55,15 +56,18 @@
                     AggregateState state) throws HyracksDataException {
                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
                 inputVal.reset();
-                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
+                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
                 inputVal.append(position);
+
+                // make an empty field
+                tupleBuilder.addFieldEndOffset();
             }
 
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
-                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor,tIndex,1));
+                position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
                 inputVal.append(position);
             }
 
@@ -80,6 +84,7 @@
                 ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
                 try {
                     fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+
                     tupleBuilder.addFieldEndOffset();
                 } catch (IOException e) {
                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
index b98db56..0f791a1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -36,9 +36,9 @@
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

             throws HyracksDataException {

-        return new IAggregatorDescriptor (){

+        return new IAggregatorDescriptor() {

 

-            private PositionReference positionReEntry = new PositionReference();

+            private PositionReference position = new PositionReference();

 

             @Override

             public AggregateState createAggregateStates() {

@@ -46,31 +46,35 @@
             }

 

             @Override

-            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)

-                    throws HyracksDataException {

-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

+                    AggregateState state) throws HyracksDataException {

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;

                 inputVal.reset();

                 int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

-                for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

-                    positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

-                    inputVal.append(positionReEntry);

+                for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,

+                        1); offset += PositionReference.LENGTH) {

+                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(position);

                 }

+                //make a fake feild to cheat caller

+                tupleBuilder.addFieldEndOffset();

             }

 

             @Override

             public void reset() {

                 // TODO Auto-generated method stub

-                

+

             }

 

             @Override

             public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

                     int stateTupleIndex, AggregateState state) throws HyracksDataException {

-                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;

+                ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;

                 int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();

-                for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){

-                    positionReEntry.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

-                    inputVal.append(positionReEntry);

+                for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,

+                        1); offset += PositionReference.LENGTH) {

+                    position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);

+                    inputVal.append(position);

                 }

             }

 

@@ -88,6 +92,7 @@
                 try {

                     fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());

                     tupleBuilder.addFieldEndOffset();

+

                 } catch (IOException e) {

                     throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

                 }

@@ -96,10 +101,10 @@
             @Override

             public void close() {

                 // TODO Auto-generated method stub

-                

+

             }

-            

+

         };

-        

+

     }

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 981b2de..0a1ef96 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -71,8 +71,8 @@
                     throw new IllegalArgumentException("Not enough kmer bytes");
                 }
                 reEnterKey.setNewReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
-                int countOfPos = tuple.getFieldLength(InputPositionListField);
-                if (countOfPos % PositionWritable.LENGTH != 0) {
+                int countOfPos = tuple.getFieldLength(InputPositionListField)/PositionWritable.LENGTH;
+                if (tuple.getFieldLength(InputPositionListField) % PositionWritable.LENGTH != 0) {
                     throw new IllegalArgumentException("Invalid count of position byte");
                 }
                 plist.setNewReference(countOfPos, tuple.getFieldData(InputPositionListField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index 664a2d2..f614846 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -48,9 +48,11 @@
                 if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
                     throw new IllegalArgumentException("Not enough kmer bytes");
                 }
-                kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField), tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
-                int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField);
-                if (countOfPos % PositionWritable.LENGTH != 0) {
+                kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField),
+                        tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
+                int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField)
+                        / PositionWritable.LENGTH;
+                if (tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField) % PositionWritable.LENGTH != 0) {
                     throw new IllegalArgumentException("Invalid count of position byte");
                 }
                 plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 2f40337..aa0c05d 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -58,6 +58,7 @@
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -67,6 +68,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
@@ -132,12 +134,12 @@
                         tableSize), true);
     }
 
-    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec,
+    private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
             IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
             ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
             IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
             throws HyracksDataException {
-        int[] keyFields = new int[] { 0 }; // the id of grouped key
+
         Object[] obj = new Object[3];
 
         switch (groupbyType) {
@@ -150,7 +152,9 @@
                 break;
             case PRECLUSTER:
             default:
-                obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+
+                obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
                         combineRed);
                 obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
@@ -185,16 +189,23 @@
 
     public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
             AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                ReadsKeyValueParserFactory.readKmerOutputRec);
+        connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
 
         RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         jobSpec.setFrameSize(frameSize);
 
-        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
                 new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
                 new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
         AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
         logDebug("LocalKmerGroupby Operator");
-        connectOperators(jobSpec, readOperator, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+        connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
 
         logDebug("CrossKmerGroupby Operator");
@@ -218,15 +229,21 @@
 
     public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
             AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
+        int[] keyFields = new int[] { 0 }; // the id of grouped key
         // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+                ReadsKeyValueParserFactory.readKmerOutputRec);
+        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
         RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         RecordDescriptor readIDFinalRec = new RecordDescriptor(
                 new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
-        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
                 new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
                 new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
         AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
-        connectOperators(jobSpec, mapKmerToRead, ncNodeNames, readLocalAggregator, ncNodeNames,
+        connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
 
         logDebug("Group by ReadID merger");
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index da74f27..6cb2491 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -30,202 +30,193 @@
 
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
-	private static final int KmerSize = 5;
-	private static final String ACTUAL_RESULT_DIR = "actual";
-	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+    private static final int KmerSize = 5;
+    private static final String ACTUAL_RESULT_DIR = "actual";
+    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-	private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
-	private static final String HDFS_INPUT_PATH = "/webmap";
-	private static final String HDFS_OUTPUT_PATH = "/webmap_result";
+    private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
+    private static final String HDFS_INPUT_PATH = "/webmap";
+    private static final String HDFS_OUTPUT_PATH = "/webmap_result";
 
-	private static final String EXPECTED_DIR = "src/test/resources/expected/";
-	private static final String EXPECTED_READER_RESULT = EXPECTED_DIR
-			+ "result_after_initial_read";
-	private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR
-			+ "result_after_kmerAggregate";
+    private static final String EXPECTED_DIR = "src/test/resources/expected/";
+    private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
+    private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
 
-	private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
-			+ HDFS_OUTPUT_PATH + "/merged.txt";
-	private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
-	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
-			+ File.separator + "conf.xml";
-	private MiniDFSCluster dfsCluster;
+    private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
+    private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private MiniDFSCluster dfsCluster;
 
-	private JobConf conf = new JobConf();
-	private int numberOfNC = 2;
-	private int numPartitionPerMachine = 1;
+    private JobConf conf = new JobConf();
+    private int numberOfNC = 2;
+    private int numPartitionPerMachine = 1;
 
-	private Driver driver;
+    private Driver driver;
 
-	@Test
-	public void TestAll() throws Exception {
-		//TestReader();
-		TestGroupbyKmer();
-		// TestMapKmerToRead();
-		// TestGroupByReadID();
-		// TestEndToEnd();
-	}
+    @Test
+    public void TestAll() throws Exception {
+        //TestReader();
+        TestGroupbyKmer();
+        // TestMapKmerToRead();
+        // TestGroupByReadID();
+        // TestEndToEnd();
+    }
 
-	public void TestReader() throws Exception {
-	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
-		driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
-		Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
-	}
+    public void TestReader() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, false));
+    }
 
-	public void TestGroupbyKmer() throws Exception {
-	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
-		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
-		driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
-		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
-	}
+    public void TestGroupbyKmer() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, true));
+    }
 
-	public void TestMapKmerToRead() throws Exception {
-	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
-		driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
-		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
-	}
+    public void TestMapKmerToRead() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+    }
 
-	public void TestGroupByReadID() throws Exception {
-	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
-		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
-		driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
-		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
-	}
+    public void TestGroupByReadID() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+        driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+    }
 
-	public void TestEndToEnd() throws Exception {
-	    conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
-		cleanUpReEntry();
-		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
-		driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
-		cleanUpReEntry();
-		conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-		driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-		Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER));
-	}
+    public void TestEndToEnd() throws Exception {
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        cleanUpReEntry();
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+        cleanUpReEntry();
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+    }
 
-	@Before
-	public void setUp() throws Exception {
-		cleanupStores();
-		edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
-		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-		startHDFS();
+    @Before
+    public void setUp() throws Exception {
+        cleanupStores();
+        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHDFS();
 
-		FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
-		FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+        FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+        FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
 
-		conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
-		driver = new Driver(
-				edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
-				edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
-				numPartitionPerMachine);
-	}
+        conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
+        driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+                edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
+    }
 
-	private void cleanupStores() throws IOException {
-		FileUtils.forceMkdir(new File("teststore"));
-		FileUtils.forceMkdir(new File("build"));
-		FileUtils.cleanDirectory(new File("teststore"));
-		FileUtils.cleanDirectory(new File("build"));
-	}
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
 
-	private void startHDFS() throws IOException {
-		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+    private void startHDFS() throws IOException {
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
 
-		FileSystem lfs = FileSystem.getLocal(new Configuration());
-		lfs.delete(new Path("build"), true);
-		System.setProperty("hadoop.log.dir", "logs");
-		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-		FileSystem dfs = FileSystem.get(conf);
-		Path src = new Path(DATA_INPUT_PATH);
-		Path dest = new Path(HDFS_INPUT_PATH);
-		dfs.mkdirs(dest);
-		// dfs.mkdirs(result);
-		dfs.copyFromLocalFile(src, dest);
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        FileSystem dfs = FileSystem.get(conf);
+        Path src = new Path(DATA_INPUT_PATH);
+        Path dest = new Path(HDFS_INPUT_PATH);
+        dfs.mkdirs(dest);
+        // dfs.mkdirs(result);
+        dfs.copyFromLocalFile(src, dest);
 
-		DataOutputStream confOutput = new DataOutputStream(
-				new FileOutputStream(new File(HADOOP_CONF_PATH)));
-		conf.writeXml(confOutput);
-		confOutput.flush();
-		confOutput.close();
-	}
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
 
-	private void cleanUpReEntry() throws IOException {
-		FileSystem lfs = FileSystem.getLocal(new Configuration());
-		if (lfs.exists(new Path(DUMPED_RESULT))) {
-			lfs.delete(new Path(DUMPED_RESULT), true);
-		}
-		FileSystem dfs = FileSystem.get(conf);
-		if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
-			dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
-		}
-	}
+    private void cleanUpReEntry() throws IOException {
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(DUMPED_RESULT))) {
+            lfs.delete(new Path(DUMPED_RESULT), true);
+        }
+        FileSystem dfs = FileSystem.get(conf);
+        if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
+            dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
+        }
+    }
 
-	private boolean checkResults(String expectedPath) throws Exception {
-		File dumped = null;
-		String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
-		if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
-			FileUtil.copyMerge(FileSystem.get(conf),
-					new Path(HDFS_OUTPUT_PATH), FileSystem
-							.getLocal(new Configuration()), new Path(
-							DUMPED_RESULT), false, conf, null);
-			dumped = new File(DUMPED_RESULT);
-		} else {
+    private boolean checkResults(String expectedPath, boolean checkPos) throws Exception {
+        File dumped = null;
+        String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
+        if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
+            FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
+                    FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
+            dumped = new File(DUMPED_RESULT);
+        } else {
 
-			FileSystem.getLocal(new Configuration()).mkdirs(
-					new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
-			File filePathTo = new File(CONVERT_RESULT);
-			BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
-			for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
-				String partname = "/part-" + i;
-				// FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
-				// + partname), FileSystem.getLocal(new Configuration()),
-				// new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
-				// false, conf);
+            FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+            File filePathTo = new File(CONVERT_RESULT);
+            BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+            for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
+                String partname = "/part-" + i;
+                // FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+                // + partname), FileSystem.getLocal(new Configuration()),
+                // new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname),
+                // false, conf);
 
-				Path path = new Path(HDFS_OUTPUT_PATH + partname);
-				FileSystem dfs = FileSystem.get(conf);
-				if (dfs.getFileStatus(path).getLen() == 0) {
-					continue;
-				}
-				SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
-						conf);
+                Path path = new Path(HDFS_OUTPUT_PATH + partname);
+                FileSystem dfs = FileSystem.get(conf);
+                if (dfs.getFileStatus(path).getLen() == 0) {
+                    continue;
+                }
+                SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path, conf);
 
-				// KmerBytesWritable key = (KmerBytesWritable)
-				// ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-				KmerBytesWritable key = new KmerBytesWritable(conf.getInt(
-						GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN));
-				// KmerCountValue value = (KmerCountValue)
-				// ReflectionUtils.newInstance(reader.getValueClass(), conf);
-				KmerBytesWritable value = null;
-				while (reader.next(key, value)) {
-					if (key == null || value == null) {
-						break;
-					}
-					bw.write(key.toString() + "\t" + value.toString());
-					System.out
-							.println(key.toString() + "\t" + value.toString());
-					bw.newLine();
-				}
-				reader.close();
-			}
-			bw.close();
-			dumped = new File(CONVERT_RESULT);
-		}
+                // KmerBytesWritable key = (KmerBytesWritable)
+                // ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                KmerBytesWritable key = new KmerBytesWritable(conf.getInt(GenomixJobConf.KMER_LENGTH, KmerSize));
+                // KmerCountValue value = (KmerCountValue)
+                // ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                KmerBytesWritable value = null;
+                while (reader.next(key, value)) {
+                    if (key == null || value == null) {
+                        break;
+                    }
+                    bw.write(key.toString() + "\t" + value.toString());
+                    System.out.println(key.toString() + "\t" + value.toString());
+                    bw.newLine();
+                }
+                reader.close();
+            }
+            bw.close();
+            dumped = new File(CONVERT_RESULT);
+        }
 
-		TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
-		return true;
-	}
+        if (checkPos) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped);
+        } else {
+            TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
+        }
+        return true;
+    }
 
-	@After
-	public void tearDown() throws Exception {
-		edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
-		cleanupHDFS();
-	}
+    @After
+    public void tearDown() throws Exception {
+        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
+        cleanupHDFS();
+    }
 
-	private void cleanupHDFS() throws Exception {
-		dfsCluster.shutdown();
-	}
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
 }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index bcfdedd..e97df1b 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -61,6 +61,38 @@
         }
     }
 
+    public static void compareWithUnSortedPosition(File expectedFile, File actualFile) throws Exception {
+        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+        ArrayList<String> actualLines = new ArrayList<String>();
+        String lineExpected, lineActual;
+        try {
+            while ((lineActual = readerActual.readLine()) != null) {
+                actualLines.add(lineActual);
+            }
+            Collections.sort(actualLines);
+            int num = 1;
+            for (String actualLine : actualLines) {
+                lineExpected = readerExpected.readLine();
+                if (lineExpected == null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
+                }
+                if (!containStrings(lineExpected, actualLine)) {
+                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                            + actualLine);
+                }
+                ++num;
+            }
+            lineExpected = readerExpected.readLine();
+            if (lineExpected != null) {
+                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+            }
+        } finally {
+            readerActual.close();
+            readerExpected.close();
+        }
+    }
+
     public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
         BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
         BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
@@ -130,4 +162,40 @@
         return true;
     }
 
+    private static boolean containStrings(String lineExpected, String actualLine) {
+        String keyExp = lineExpected.split("\\\t")[0];
+        String keyAct = actualLine.split("\\\t")[0];
+        if (!keyAct.equals(keyExp)) {
+            return false;
+        }
+
+        ArrayList<String> posExp = new ArrayList<String>();
+        ArrayList<String> posAct = new ArrayList<String>();
+
+        String valueExp = lineExpected.split("\\\t")[1];
+        String[] valuesExp = valueExp.substring(1, valueExp.length() - 1).split(",");
+
+        for (String str : valuesExp) {
+            posExp.add(str);
+        }
+
+        String valueAct = actualLine.split("\\\t")[1];
+        String[] valuesAct = valueAct.substring(1, valueAct.length() - 1).split(",");
+
+        for (String str : valuesAct) {
+            posAct.add(str);
+        }
+
+        if (posExp.size() != posAct.size()) {
+            return false;
+        }
+        Collections.sort(posExp);
+        Collections.sort(posAct);
+        for (int i = 0; i < posExp.size(); i++) {
+            if (!posExp.get(i).equals(posAct.get(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
new file mode 100644
index 0000000..728c093
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
@@ -0,0 +1,20 @@
+AATAG	(1,0)
+AATAG	(2,0)
+AATAG	(3,0)
+AATAG	(4,0)
+AATAG	(5,0)
+AGAAG	(1,3)
+AGAAG	(2,3)
+AGAAG	(3,3)
+AGAAG	(4,3)
+AGAAG	(5,3)
+ATAGA	(1,1)
+ATAGA	(2,1)
+ATAGA	(3,1)
+ATAGA	(4,1)
+ATAGA	(5,1)
+TAGAA	(1,2)
+TAGAA	(2,2)
+TAGAA	(3,2)
+TAGAA	(4,2)
+TAGAA	(5,2)
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
new file mode 100644
index 0000000..d5624d7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
@@ -0,0 +1,4 @@
+AATAG	[(1,0),(2,0),(3,0),(4,0),(5,0)]
+AGAAG	[(1,3),(2,3),(3,3),(4,3),(5,3)]
+ATAGA	[(1,1),(2,1),(3,1),(4,1),(5,1)]
+TAGAA	[(1,2),(2,2),(3,2),(4,2),(5,2)]