passed map kmer to readid operator
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index a578543..40817e8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -10,7 +10,7 @@
 
 import edu.uci.ics.genomix.data.Marshal;
 
-public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable{
+public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable {
     /**
      * 
      */
@@ -121,6 +121,13 @@
         valueCount += 1;
     }
 
+    public static int getCountByDataLength(int length) {
+        if (length % PositionWritable.LENGTH != 0) {
+            throw new IllegalArgumentException("Length of positionlist is invalid");
+        }
+        return length / PositionWritable.LENGTH;
+    }
+
     public int getCountOfPosition() {
         return valueCount;
     }
@@ -151,14 +158,18 @@
     }
 
     @Override
-    public String toString(){
+    public String toString() {
         StringBuilder sbuilder = new StringBuilder();
         sbuilder.append('[');
-        for(PositionWritable pos : this){
+        for (PositionWritable pos : this) {
             sbuilder.append(pos.toString());
             sbuilder.append(',');
         }
-        sbuilder.setCharAt(sbuilder.length()-1, ']');
+        if (valueCount > 0) {
+            sbuilder.setCharAt(sbuilder.length() - 1, ']');
+        } else {
+            sbuilder.append(']');
+        }
         return sbuilder.toString();
     }
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
index 4d00731..1f3efbf 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDNormarlizedComputeFactory.java
@@ -1,8 +1,8 @@
 package edu.uci.ics.genomix.hyracks.data.accessors;
 
+import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class ReadIDNormarlizedComputeFactory implements INormalizedKeyComputerFactory{
 
@@ -17,7 +17,7 @@
 
             @Override
             public int normalize(byte[] bytes, int start, int length) {
-                return IntegerSerializerDeserializer.getInt(bytes, start);
+                return Marshal.getInt(bytes, start);
             }
             
         };
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
index d328bd1..7229364 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
@@ -2,10 +2,10 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.genomix.data.Marshal;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class ReadIDPartitionComputerFactory implements ITuplePartitionComputerFactory {
 
@@ -26,7 +26,7 @@
 
                 ByteBuffer buf = accessor.getBuffer();
 
-                int hash = IntegerSerializerDeserializer.getInt(buf.array(), startOffset + fieldOffset + slotLength);
+                int hash = Marshal.getInt(buf.array(), startOffset + fieldOffset + slotLength);
                 if (hash < 0) {
                     hash = -(hash + 1);
                 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
index 09c0d2e..84334c6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -18,186 +18,160 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
-public class MapKmerPositionToReadOperator extends
-		AbstractSingleActivityOperatorDescriptor {
+public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
 
-	public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec,
-			RecordDescriptor recDesc) {
-		super(spec, 1, 1);
-		recordDescriptors[0] = recDesc;
-	}
+    public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = recDesc;
+    }
 
-	private static final long serialVersionUID = 1L;
-	public static final int InputKmerField = 0;
-	public static final int InputPosListField = 1;
+    private static final long serialVersionUID = 1L;
+    public static final int InputKmerField = 0;
+    public static final int InputPosListField = 1;
 
-	public static final int OutputReadIDField = 0;
-	public static final int OutputPosInReadField = 1;
-	public static final int OutputOtherReadIDListField = 2;
-	public static final int OutputKmerField = 3; // may not needed
+    public static final int OutputReadIDField = 0;
+    public static final int OutputPosInReadField = 1;
+    public static final int OutputOtherReadIDListField = 2;
+    public static final int OutputKmerField = 3; // may not needed
 
-	public static final RecordDescriptor readIDOutputRec = new RecordDescriptor(
-			new ISerializerDeserializer[] { null, null, null, null });
+    public static final RecordDescriptor readIDOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+            null, null, null });
 
-	/**
-	 * Map (Kmer, {(ReadID,PosInRead),...}) into
-	 * (ReadID,PosInRead,{OtherReadID,...},*Kmer*) OtherReadID appears only when
-	 * otherReadID.otherPos==0
-	 */
-	public class MapKmerPositionToReadNodePushable extends
-			AbstractUnaryInputUnaryOutputOperatorNodePushable {
-		private final IHyracksTaskContext ctx;
-		private final RecordDescriptor inputRecDesc;
-		private final RecordDescriptor outputRecDesc;
+    /**
+     * Map (Kmer, {(ReadID,PosInRead),...}) into
+     * (ReadID,PosInRead,{OtherReadID,...},*Kmer*) OtherReadID appears only when
+     * otherReadID.otherPos==0
+     */
+    public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+        private final IHyracksTaskContext ctx;
+        private final RecordDescriptor inputRecDesc;
+        private final RecordDescriptor outputRecDesc;
 
-		private FrameTupleAccessor accessor;
-		private ByteBuffer writeBuffer;
-		private ArrayTupleBuilder builder;
-		private FrameTupleAppender appender;
+        private FrameTupleAccessor accessor;
+        private ByteBuffer writeBuffer;
+        private ArrayTupleBuilder builder;
+        private FrameTupleAppender appender;
 
-		private PositionReference positionEntry;
-		private ArrayBackedValueStorage posListEntry;
-		private ArrayBackedValueStorage zeroPositionCollection;
-		private ArrayBackedValueStorage noneZeroPositionCollection;
+        private PositionReference positionEntry;
+        private ArrayBackedValueStorage posListEntry;
+        private ArrayBackedValueStorage zeroPositionCollection;
+        private ArrayBackedValueStorage noneZeroPositionCollection;
 
-		public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx,
-				RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc) {
-			this.ctx = ctx;
-			this.inputRecDesc = inputRecDesc;
-			this.outputRecDesc = outputRecDesc;
-			this.positionEntry = new PositionReference();
-			this.posListEntry = new ArrayBackedValueStorage();
-			this.zeroPositionCollection = new ArrayBackedValueStorage();
-			this.noneZeroPositionCollection = new ArrayBackedValueStorage();
-		}
+        public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+                RecordDescriptor outputRecDesc) {
+            this.ctx = ctx;
+            this.inputRecDesc = inputRecDesc;
+            this.outputRecDesc = outputRecDesc;
+            this.positionEntry = new PositionReference();
+            this.posListEntry = new ArrayBackedValueStorage();
+            this.zeroPositionCollection = new ArrayBackedValueStorage();
+            this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+        }
 
-		@Override
-		public void open() throws HyracksDataException {
-			accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
-			writeBuffer = ctx.allocateFrame();
-			builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
-			appender = new FrameTupleAppender(ctx.getFrameSize());
-			appender.reset(writeBuffer, true);
-			writer.open();
-			posListEntry.reset();
-		}
+        @Override
+        public void open() throws HyracksDataException {
+            accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+            writeBuffer = ctx.allocateFrame();
+            builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(writeBuffer, true);
+            writer.open();
+            posListEntry.reset();
+        }
 
-		@Override
-		public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-			accessor.reset(buffer);
-			int tupleCount = accessor.getTupleCount();
-			for (int i = 0; i < tupleCount; i++) {
-				scanPosition(i, zeroPositionCollection,
-						noneZeroPositionCollection);
-				scanAgainToOutputTuple(i, zeroPositionCollection,
-						noneZeroPositionCollection, builder);
-			}
-		}
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            accessor.reset(buffer);
+            int tupleCount = accessor.getTupleCount();
+            for (int i = 0; i < tupleCount; i++) {
+                scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
+                scanAgainToOutputTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+            }
+        }
 
-		private void scanPosition(int tIndex,
-				ArrayBackedValueStorage zeroPositionCollection2,
-				ArrayBackedValueStorage noneZeroPositionCollection2) {
-			zeroPositionCollection2.reset();
-			noneZeroPositionCollection2.reset();
-			byte[] data = accessor.getBuffer().array();
-			int offsetPoslist = accessor.getTupleStartOffset(tIndex)
-					+ accessor.getFieldSlotsLength()
-					+ accessor.getFieldStartOffset(tIndex, InputPosListField);
-			for (int i = 0; i < accessor.getFieldLength(tIndex,
-					InputPosListField); i += PositionReference.LENGTH) {
-				positionEntry.setNewReference(data, offsetPoslist + i);
-				if (positionEntry.getPosInRead() == 0) {
-					zeroPositionCollection2.append(positionEntry);
-				} else {
-					noneZeroPositionCollection2.append(positionEntry);
-				}
-			}
+        private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
+                ArrayBackedValueStorage noneZeroPositionCollection2) {
+            zeroPositionCollection2.reset();
+            noneZeroPositionCollection2.reset();
+            byte[] data = accessor.getBuffer().array();
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
+            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+                positionEntry.setNewReference(data, offsetPoslist + i);
+                if (positionEntry.getPosInRead() == 0) {
+                    zeroPositionCollection2.append(positionEntry);
+                } else {
+                    noneZeroPositionCollection2.append(positionEntry);
+                }
+            }
 
-		}
+        }
 
-		private void scanAgainToOutputTuple(int tIndex,
-				ArrayBackedValueStorage zeroPositionCollection,
-				ArrayBackedValueStorage noneZeroPositionCollection,
-				ArrayTupleBuilder builder2) {
-			byte[] data = accessor.getBuffer().array();
-			int offsetPoslist = accessor.getTupleStartOffset(tIndex)
-					+ accessor.getFieldSlotsLength()
-					+ accessor.getFieldStartOffset(tIndex, InputPosListField);
-			for (int i = 0; i < accessor.getFieldLength(tIndex,
-					InputPosListField); i += PositionReference.LENGTH) {
-				positionEntry.setNewReference(data, offsetPoslist + i);
-				if (positionEntry.getPosInRead() != 0) {
-					appendNodeToBuilder(tIndex, positionEntry,
-							zeroPositionCollection, builder2);
-				} else {
-					appendNodeToBuilder(tIndex, positionEntry,
-							noneZeroPositionCollection, builder2);
-				}
-			}
-		}
+        private void scanAgainToOutputTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+                ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
+            byte[] data = accessor.getBuffer().array();
+            int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                    + accessor.getFieldStartOffset(tIndex, InputPosListField);
+            for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+                positionEntry.setNewReference(data, offsetPoslist + i);
+                if (positionEntry.getPosInRead() != 0) {
+                    appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
+                } else {
+                    appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
+                }
+            }
+        }
 
-		private void appendNodeToBuilder(int tIndex, PositionReference pos,
-				ArrayBackedValueStorage posList2, ArrayTupleBuilder builder2) {
-			try {
-				builder2.addField(pos.getByteArray(), 0,
-						PositionReference.INTBYTES);
-				builder2.addField(pos.getByteArray(),
-						PositionReference.INTBYTES, 1);
-				// ? ask Yingyi, if support empty bytes[]
-				if (posList2 == null) {
-					builder2.addFieldEndOffset();
-				} else {
-					builder2.addField(posList2.getByteArray(),
-							posList2.getStartOffset(), posList2.getLength());
-				}
-				// set kmer, may not useful
-				byte[] data = accessor.getBuffer().array();
-				int offsetKmer = accessor.getTupleStartOffset(tIndex)
-						+ accessor.getFieldSlotsLength()
-						+ accessor.getFieldStartOffset(tIndex, InputKmerField);
-				builder2.addField(data, offsetKmer,
-						accessor.getFieldLength(tIndex, InputKmerField));
+        private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
+                ArrayTupleBuilder builder2) {
+            try {
+                builder2.addField(pos.getByteArray(), pos.getStartOffset(), PositionReference.INTBYTES);
+                builder2.addField(pos.getByteArray(), pos.getStartOffset() + PositionReference.INTBYTES, 1);
+                if (posList2 == null) {
+                    builder2.addFieldEndOffset();
+                } else {
+                    builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+                }
+                // set kmer, may not useful
+                byte[] data = accessor.getBuffer().array();
+                int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, InputKmerField);
+                builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
 
-				if (!appender.append(builder2.getFieldEndOffsets(),
-						builder2.getByteArray(), 0, builder2.getSize())) {
-					FrameUtils.flushFrame(writeBuffer, writer);
-					appender.reset(writeBuffer, true);
-					if (!appender.append(builder2.getFieldEndOffsets(),
-							builder2.getByteArray(), 0, builder2.getSize())) {
-						throw new IllegalStateException();
-					}
-				}
-				builder2.reset();
-			} catch (HyracksDataException e) {
-				throw new IllegalStateException(
-						"Failed to Add a field to the tuple by copying the data bytes from a byte array.");
-			}
-		}
+                if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
+                    if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+                builder2.reset();
+            } catch (HyracksDataException e) {
+                throw new IllegalStateException(
+                        "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+            }
+        }
 
-		@Override
-		public void fail() throws HyracksDataException {
-			writer.fail();
-		}
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
 
-		@Override
-		public void close() throws HyracksDataException {
-			if (appender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(writeBuffer, writer);
-			}
-			writer.close();
-		}
+        @Override
+        public void close() throws HyracksDataException {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+        }
 
-	}
+    }
 
-	@Override
-	public AbstractOperatorNodePushable createPushRuntime(
-			IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, int partition,
-			int nPartitions) {
-		return new MapKmerPositionToReadNodePushable(
-				ctx,
-				recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
-				recordDescriptors[0]);
-	}
+    @Override
+    public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
+                0), recordDescriptors[0]);
+    }
 
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index b36b82d..37d5289 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -47,14 +47,16 @@
     private KmerBytesWritable kmer;
     private PositionReference pos;
     private boolean bReversed;
+    private final int readLength;
 
     public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
             null });
 
-    public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
+    public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
         bReversed = bGenerateReversed;
         kmer = new KmerBytesWritable(k);
         pos = new PositionReference();
+        this.readLength = readlength;
     }
 
     @Override
@@ -76,7 +78,7 @@
                 try {
                     readID = Integer.parseInt(geneLine[0]);
                 } catch (NumberFormatException e) {
-                    LOG.warn("Invalid data");
+                    LOG.warn("Invalid data " );
                     return;
                 }
 
@@ -84,6 +86,10 @@
                 Matcher geneMatcher = genePattern.matcher(geneLine[1]);
                 boolean isValid = geneMatcher.matches();
                 if (isValid) {
+                    if (geneLine[1].length() != readLength){
+                        LOG.warn("Invalid readlength at: " + readID );
+                        return;
+                    }
                     SplitReads(readID, geneLine[1].getBytes(), writer);
                 }
             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 4006fb4..6ee7946 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -47,7 +47,6 @@
 
             @Override
             public AggregateState createAggregateStates() {
-                System.out.println("CreateState");
                 return new AggregateState(new ArrayBackedValueStorage());
             }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
index c7552ca..fe9dab5 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateReadIDAggregateFactory.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -21,6 +22,7 @@
      * 
      */
     private static final long serialVersionUID = 1L;
+
     public static final int InputReadIDField = MapKmerPositionToReadOperator.OutputReadIDField;
     public static final int InputPosInReadField = MapKmerPositionToReadOperator.OutputPosInReadField;
     public static final int InputPositionListField = MapKmerPositionToReadOperator.OutputOtherReadIDListField;
@@ -29,6 +31,9 @@
     public static final int OutputReadIDField = 0;
     public static final int OutputPositionListField = 1;
 
+    public static final RecordDescriptor readIDAggregateRec = new RecordDescriptor(new ISerializerDeserializer[] {
+            null, null });
+
     public AggregateReadIDAggregateFactory() {
     }
 
@@ -65,7 +70,7 @@
                 ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
                 storage.reset();
                 DataOutput out = storage.getDataOutput();
-                byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+                byte posInRead = readByteField(accessor, tIndex, InputPosInReadField);
 
                 try {
                     out.writeByte(posInRead);
@@ -74,7 +79,8 @@
                 } catch (IOException e) {
                     throw new HyracksDataException("Failed to write into temporary storage");
                 }
-
+                // make fake feild
+                tupleBuilder.addFieldEndOffset();
             }
 
             private void writeBytesToStorage(DataOutput out, IFrameTupleAccessor accessor, int tIndex, int idField)
@@ -95,7 +101,7 @@
                     int stateTupleIndex, AggregateState state) throws HyracksDataException {
                 ArrayBackedValueStorage storage = (ArrayBackedValueStorage) state.state;
                 DataOutput out = storage.getDataOutput();
-                byte posInRead = readByteField(accessor, tIndex, InputPositionListField);
+                byte posInRead = readByteField(accessor, tIndex, InputPosInReadField);
 
                 try {
                     out.writeByte(posInRead);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
index d3971fb..fb36898 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -14,57 +14,55 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class MergeReadIDAggregateFactory implements
-		IAggregatorDescriptorFactory {
+public class MergeReadIDAggregateFactory implements IAggregatorDescriptorFactory {
 
-	/**
+    /**
      * 
      */
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private final int ValidPosCount;
+    private final int ValidPosCount;
 
-	public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
-		ValidPosCount = getPositionCount(readLength, kmerLength);
-	}
+    public MergeReadIDAggregateFactory(int readLength, int kmerLength) {
+        ValidPosCount = getPositionCount(readLength, kmerLength);
+    }
 
-	public static int getPositionCount(int readLength, int kmerLength) {
-		return readLength - kmerLength + 1;
-	}
+    public static int getPositionCount(int readLength, int kmerLength) {
+        return readLength - kmerLength + 1;
+    }
 
-	public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
-	public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
+    public static final int InputReadIDField = AggregateReadIDAggregateFactory.OutputReadIDField;
+    public static final int InputPositionListField = AggregateReadIDAggregateFactory.OutputPositionListField;
 
-	public static final int BYTE_SIZE = 1;
-	public static final int INTEGER_SIZE = 4;
+    public static final int BYTE_SIZE = 1;
+    public static final int INTEGER_SIZE = 4;
 
-	/**
-	 * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to Aggregate as
-	 * (ReadID, Storage[posInRead]={PositionList,Kmer})
-	 * 
-	 */
-	@Override
+    /**
+     * (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} to Aggregate as
+     * (ReadID, Storage[posInRead]={PositionList,Kmer})
+     */
+    @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
             throws HyracksDataException {
         return new IAggregatorDescriptor() {
-        	
-        	class PositionArray{
-        		public ArrayBackedValueStorage[] storages;
-        		public int count;
-        		
-        		public PositionArray(ArrayBackedValueStorage[] storages2, int i){
-        			storages = storages2;
-        			count = i;
-        		}
-        		
-        		public void reset(){
-        			for (ArrayBackedValueStorage each : storages) {
+
+            class PositionArray {
+                public ArrayBackedValueStorage[] storages;
+                public int count;
+
+                public PositionArray(ArrayBackedValueStorage[] storages2, int i) {
+                    storages = storages2;
+                    count = i;
+                }
+
+                public void reset() {
+                    for (ArrayBackedValueStorage each : storages) {
                         each.reset();
                     }
                     count = 0;
-        		}
-        	}
+                }
+            }
 
             @Override
             public AggregateState createAggregateStates() {
@@ -72,7 +70,7 @@
                 for (int i = 0; i < storages.length; i++) {
                     storages[i] = new ArrayBackedValueStorage();
                 }
-                return new AggregateState(new PositionArray(storages,0));
+                return new AggregateState(new PositionArray(storages, 0));
             }
 
             @Override
@@ -98,6 +96,10 @@
                     fieldOffset += writeBytesToStorage(storages[posInRead], fieldBuffer, fieldOffset);
                     positionArray.count++;
                 }
+                // make fake fields
+                for (int i = 0; i < ValidPosCount; i++) {
+                    tupleBuilder.addFieldEndOffset();
+                }
             }
 
             private int writeBytesToStorage(ArrayBackedValueStorage storage, ByteBuffer fieldBuffer, int fieldOffset)
@@ -152,7 +154,7 @@
             @Override
             public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-            	PositionArray positionArray = (PositionArray) state.state;
+                PositionArray positionArray = (PositionArray) state.state;
                 ArrayBackedValueStorage[] storages = positionArray.storages;
                 if (positionArray.count != storages.length) {
                     throw new IllegalStateException("Final aggregate position number is invalid");
@@ -160,7 +162,8 @@
                 DataOutput fieldOutput = tupleBuilder.getDataOutput();
                 try {
                     for (int i = 0; i < storages.length; i++) {
-                        fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(), storages[i].getLength());
+                        fieldOutput.write(storages[i].getByteArray(), storages[i].getStartOffset(),
+                                storages[i].getLength());
                         tupleBuilder.addFieldEndOffset();
                     }
                 } catch (IOException e) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
deleted file mode 100644
index 32e7b4d..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package edu.uci.ics.genomix.hyracks.dataflow.io;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class MapperKmerToReadIDTextWriterFactory implements ITupleWriterFactory {
-
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    public MapperKmerToReadIDTextWriterFactory(int kmerSize) {
-        // TODO Auto-generated constructor stub
-    }
-
-    @Override
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
deleted file mode 100644
index 28b8484..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package edu.uci.ics.genomix.hyracks.dataflow.io;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
-import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
-
-public class ReadIDAggregationTextWriterFactory implements ITupleWriterFactory {
-
-    /**
-     * 
-     */
-    private static final long serialVersionUID = 1L;
-
-    public ReadIDAggregationTextWriterFactory(int kmerSize) {
-        // TODO Auto-generated constructor stub
-    }
-
-    @Override
-    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index aa0c05d..8e45884 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -173,7 +173,7 @@
                     .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
 
             return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
-                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(kmerSize,
+                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength, kmerSize,
                             bGenerateReversedKmer));
         } catch (Exception e) {
             throw new HyracksDataException(e);
@@ -232,16 +232,16 @@
         int[] keyFields = new int[] { 0 }; // the id of grouped key
         // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
-                ReadsKeyValueParserFactory.readKmerOutputRec);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                MapKmerPositionToReadOperator.readIDOutputRec);
         connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
                 jobSpec));
-        RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+
         RecordDescriptor readIDFinalRec = new RecordDescriptor(
                 new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
         Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
-                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
-                new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
+                new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
+                IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
         AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
         connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
@@ -327,9 +327,6 @@
         logDebug("Group by Read Operator");
         lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
 
-        logDebug("Map ReadInfo to Node");
-        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
-
         logDebug("Write node to result");
         lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
 
@@ -352,9 +349,9 @@
         bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
 
         String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        if (type.equalsIgnoreCase("external")) {
+        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
             groupbyType = GroupbyType.EXTERNAL;
-        } else if (type.equalsIgnoreCase("precluster")) {
+        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
             groupbyType = GroupbyType.PRECLUSTER;
         } else {
             groupbyType = GroupbyType.HYBRIDHASH;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
index 8a11379..d5e52fa 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -1,14 +1,17 @@
 package edu.uci.ics.genomix.hyracks.job;
 
+import java.io.DataOutput;
 import java.util.Map;
 
-import edu.uci.ics.genomix.hyracks.dataflow.io.ReadIDAggregationTextWriterFactory;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
@@ -46,22 +49,52 @@
         logDebug("Group by Read Operator");
         lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
 
-        logDebug("Map ReadInfo to Node");
-        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
-
         logDebug("Write node to result");
-        generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
-
+        lastOperator = generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
+        jobSpec.addRoot(lastOperator);
         return jobSpec;
     }
 
     public AbstractOperatorDescriptor generateRootByWriteReadIDAggregationResult(JobSpecification jobSpec,
             AbstractOperatorDescriptor readCrossAggregator) throws HyracksException {
-        ITupleWriterFactory readWriter = new ReadIDAggregationTextWriterFactory(kmerSize);
-        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), readWriter);
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    /**
+                     * 
+                     */
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        // TODO Auto-generated method stub
+                        return new ITupleWriter() {
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+                                // TODO Auto-generated method stub
+
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                // TODO Auto-generated method stub
+
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+                                // TODO Auto-generated method stub
+
+                            }
+
+                        };
+                    }
+
+                });
         connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
-        jobSpec.addRoot(writeKmerOperator);
+
         return writeKmerOperator;
     }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
index 4fa554f..f2cad73 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -1,14 +1,23 @@
 package edu.uci.ics.genomix.hyracks.job;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Map;
 
-import edu.uci.ics.genomix.hyracks.dataflow.io.MapperKmerToReadIDTextWriterFactory;
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
@@ -26,13 +35,79 @@
         super(job, scheduler, ncMap, numPartitionPerMachine);
         // TODO Auto-generated constructor stub
     }
-    
-    public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec, AbstractOperatorDescriptor mapper) throws HyracksException{
+
+    public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapper) throws HyracksException {
         // Output Kmer
-        ITupleWriterFactory kmerWriter = new MapperKmerToReadIDTextWriterFactory(kmerSize);
-        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), kmerWriter);
-        connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    /**
+                     * 
+                     */
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        return new ITupleWriter() {
+
+                            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+                            private PositionListWritable plist = new PositionListWritable();
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+                                // TODO Auto-generated method stub
+
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                try {
+                                    int readID = Marshal.getInt(
+                                            tuple.getFieldData(MapKmerPositionToReadOperator.OutputReadIDField),
+                                            tuple.getFieldStart(MapKmerPositionToReadOperator.OutputReadIDField));
+                                    byte posInRead = tuple
+                                            .getFieldData(MapKmerPositionToReadOperator.OutputPosInReadField)[tuple
+                                            .getFieldStart(MapKmerPositionToReadOperator.OutputPosInReadField)];
+                                    int posCount = PositionListWritable.getCountByDataLength(tuple
+                                            .getFieldLength(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+                                    plist.setNewReference(
+                                            posCount,
+                                            tuple.getFieldData(MapKmerPositionToReadOperator.OutputOtherReadIDListField),
+                                            tuple.getFieldStart(MapKmerPositionToReadOperator.OutputOtherReadIDListField));
+
+                                    if (kmer.getLength() > tuple
+                                            .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                        throw new IllegalArgumentException("Not enough kmer bytes");
+                                    }
+                                    kmer.setNewReference(
+                                            tuple.getFieldData(MapKmerPositionToReadOperator.OutputKmerField),
+                                            tuple.getFieldStart(MapKmerPositionToReadOperator.OutputKmerField));
+
+                                    String out = readID + "\t" + (int) posInRead + "\t";
+                                    output.write(out.getBytes());
+                                    output.write(plist.toString().getBytes());
+                                    output.writeByte('\t');
+                                    output.write(kmer.toString().getBytes());
+                                    output.writeByte('\n');
+                                } catch (IOException e) {
+                                    throw new HyracksDataException(e);
+                                }
+
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+                                // TODO Auto-generated method stub
+
+                            }
+
+                        };
+                    }
+
+                });
+        connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames, new OneToOneConnectorDescriptor(
+                jobSpec));
         jobSpec.addRoot(writeKmerOperator);
         return writeKmerOperator;
     }
@@ -51,7 +126,7 @@
         lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
 
         generateRootByWriteMapperFromKmerToReadID(jobSpec, lastOperator);
-        
+
         return jobSpec;
     }
 }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 6cb2491..631fc5b 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -31,6 +31,7 @@
 @SuppressWarnings("deprecation")
 public class JobRunStepByStepTest {
     private static final int KmerSize = 5;
+    private static final int ReadLength = 8;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
@@ -41,6 +42,8 @@
     private static final String EXPECTED_DIR = "src/test/resources/expected/";
     private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
     private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
+    private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
+    private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
 
     private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
     private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
@@ -56,36 +59,40 @@
     @Test
     public void TestAll() throws Exception {
         //TestReader();
-        TestGroupbyKmer();
-        // TestMapKmerToRead();
-        // TestGroupByReadID();
+        //TestGroupbyKmer();
+        //TestMapKmerToRead();
+        TestGroupByReadID();
         // TestEndToEnd();
     }
 
     public void TestReader() throws Exception {
+        cleanUpReEntry();
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
         driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, false));
+        Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT, -1));
     }
 
     public void TestGroupbyKmer() throws Exception {
+        cleanUpReEntry();
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_KMERHASHTABLE, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, true));
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, 1));
     }
 
     public void TestMapKmerToRead() throws Exception {
+        cleanUpReEntry();
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
         driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_MAP_KMER_TO_READ, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+        Assert.assertEquals(true, checkResults(EXPECTED_KMER_TO_READID, 2));
     }
 
     public void TestGroupByReadID() throws Exception {
+        cleanUpReEntry();
         conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
-        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
+        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.OUTPUT_GROUPBY_READID, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+        Assert.assertEquals(true, checkResults(EXPECTED_GROUPBYREADID, -1));
     }
 
     public void TestEndToEnd() throws Exception {
@@ -93,11 +100,11 @@
         cleanUpReEntry();
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_EXTERNAL);
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, -1));
         cleanUpReEntry();
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER,false));
+        Assert.assertEquals(true, checkResults(EXPECTED_OUPUT_KMER, -1));
     }
 
     @Before
@@ -112,6 +119,7 @@
         FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
 
         conf.setInt(GenomixJobConf.KMER_LENGTH, KmerSize);
+        conf.setInt(GenomixJobConf.READ_LENGTH, ReadLength);
         driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
                 edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
     }
@@ -156,7 +164,7 @@
         }
     }
 
-    private boolean checkResults(String expectedPath, boolean checkPos) throws Exception {
+    private boolean checkResults(String expectedPath, int poslistField) throws Exception {
         File dumped = null;
         String format = conf.get(GenomixJobConf.OUTPUT_FORMAT);
         if (GenomixJobConf.OUTPUT_FORMAT_TEXT.equalsIgnoreCase(format)) {
@@ -202,8 +210,8 @@
             dumped = new File(CONVERT_RESULT);
         }
 
-        if (checkPos) {
-            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped);
+        if (poslistField > 0) {
+            TestUtils.compareWithUnSortedPosition(new File(expectedPath), dumped, poslistField);
         } else {
             TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
         }
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
index e97df1b..49e21ba 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/TestUtils.java
@@ -61,7 +61,8 @@
         }
     }
 
-    public static void compareWithUnSortedPosition(File expectedFile, File actualFile) throws Exception {
+    public static void compareWithUnSortedPosition(File expectedFile, File actualFile, int poslistField)
+            throws Exception {
         BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
         BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
         ArrayList<String> actualLines = new ArrayList<String>();
@@ -77,7 +78,7 @@
                 if (lineExpected == null) {
                     throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
                 }
-                if (!containStrings(lineExpected, actualLine)) {
+                if (!containStrings(lineExpected, actualLine, poslistField)) {
                     throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
                             + actualLine);
                 }
@@ -162,24 +163,32 @@
         return true;
     }
 
-    private static boolean containStrings(String lineExpected, String actualLine) {
-        String keyExp = lineExpected.split("\\\t")[0];
-        String keyAct = actualLine.split("\\\t")[0];
-        if (!keyAct.equals(keyExp)) {
+    private static boolean containStrings(String lineExpected, String actualLine, int poslistField) {
+        String[] fieldsExp = lineExpected.split("\\\t");
+        String[] fieldsAct = actualLine.split("\\\t");
+        if (fieldsAct.length != fieldsExp.length) {
             return false;
         }
+        for (int i = 0; i < fieldsAct.length; i++) {
+            if (i == poslistField) {
+                continue;
+            }
+            if (!fieldsAct[i].equals(fieldsExp[i])) {
+                return false;
+            }
+        }
 
         ArrayList<String> posExp = new ArrayList<String>();
         ArrayList<String> posAct = new ArrayList<String>();
 
-        String valueExp = lineExpected.split("\\\t")[1];
+        String valueExp = lineExpected.split("\\\t")[poslistField];
         String[] valuesExp = valueExp.substring(1, valueExp.length() - 1).split(",");
 
         for (String str : valuesExp) {
             posExp.add(str);
         }
 
-        String valueAct = actualLine.split("\\\t")[1];
+        String valueAct = actualLine.split("\\\t")[poslistField];
         String[] valuesAct = valueAct.substring(1, valueAct.length() - 1).split(",");
 
         for (String str : valuesAct) {
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index 08f0f95..13190dd 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -2,4 +2,5 @@
 2	AATAGAAG

 3	AATAGAAG

 4	AATAGAAG

-5	AATAGAAG
\ No newline at end of file
+5	AATAGAAG

+6	AGAAGAAG

diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
index 728c093..1091d2e 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_initial_read
@@ -1,3 +1,4 @@
+AAGAA	(6,2)
 AATAG	(1,0)
 AATAG	(2,0)
 AATAG	(3,0)
@@ -8,13 +9,16 @@
 AGAAG	(3,3)
 AGAAG	(4,3)
 AGAAG	(5,3)
+AGAAG	(6,0)
+AGAAG	(6,3)
 ATAGA	(1,1)
 ATAGA	(2,1)
 ATAGA	(3,1)
 ATAGA	(4,1)
 ATAGA	(5,1)
+GAAGA	(6,1)
 TAGAA	(1,2)
 TAGAA	(2,2)
 TAGAA	(3,2)
 TAGAA	(4,2)
-TAGAA	(5,2)
\ No newline at end of file
+TAGAA	(5,2)
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
new file mode 100644
index 0000000..0ca9de6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmer2readId
@@ -0,0 +1,24 @@
+1	0	[]	AATAG
+1	1	[]	ATAGA
+1	2	[]	TAGAA
+1	3	[(6,0)]	AGAAG
+2	0	[]	AATAG
+2	1	[]	ATAGA
+2	2	[]	TAGAA
+2	3	[(6,0)]	AGAAG
+3	0	[]	AATAG
+3	1	[]	ATAGA
+3	2	[]	TAGAA
+3	3	[(6,0)]	AGAAG
+4	0	[]	AATAG
+4	1	[]	ATAGA
+4	2	[]	TAGAA
+4	3	[(6,0)]	AGAAG
+5	0	[]	AATAG
+5	1	[]	ATAGA
+5	2	[]	TAGAA
+5	3	[(6,0)]	AGAAG
+6	0	[(1,3),(2,3),(3,3),(4,3),(5,3),(6,3)]	AGAAG
+6	1	[]	GAAGA
+6	2	[]	AAGAA
+6	3	[(6,0)]	AGAAG
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
index d5624d7..499200a 100644
--- a/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_kmerAggregate
@@ -1,4 +1,6 @@
+AAGAA	[(6,2)]
 AATAG	[(1,0),(2,0),(3,0),(4,0),(5,0)]
-AGAAG	[(1,3),(2,3),(3,3),(4,3),(5,3)]
+AGAAG	[(1,3),(2,3),(3,3),(4,3),(5,3),(6,0),(6,3)]
 ATAGA	[(1,1),(2,1),(3,1),(4,1),(5,1)]
-TAGAA	[(1,2),(2,2),(3,2),(4,2),(5,2)]
+GAAGA	[(6,1)]
+TAGAA	[(1,2),(2,2),(3,2),(4,2),(5,2)]
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_after_readIDAggreage