finished test framework
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 332b23b..7f78b68 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -3,10 +3,15 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 
 import org.apache.hadoop.io.WritableComparable;
 
-public class NodeWritable implements WritableComparable<NodeWritable> {
+public class NodeWritable implements WritableComparable<NodeWritable>, Serializable {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
     private PositionWritable nodeID;
     private int countOfKmer;
     private PositionListWritable incomingList;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index a3b5c84..a578543 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -3,13 +3,18 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.genomix.data.Marshal;
 
-public class PositionListWritable implements Writable, Iterable<PositionWritable> {
+public class PositionListWritable implements Writable, Iterable<PositionWritable>, Serializable{
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
     protected byte[] storage;
     protected int offset;
     protected int valueCount;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index c7f24ec..c19af12 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -3,13 +3,18 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 
 import edu.uci.ics.genomix.data.Marshal;
 
-public class PositionWritable implements WritableComparable<PositionWritable> {
+public class PositionWritable implements WritableComparable<PositionWritable> , Serializable{
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
     protected byte[] storage;
     protected int offset;
     public static final int LENGTH = 5;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
index fcc61d5..5cf44ba 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/NodeReference.java
@@ -2,7 +2,7 @@
 
 import edu.uci.ics.genomix.type.NodeWritable;
 
-public class NodeReference extends NodeWritable{
+public class NodeReference extends NodeWritable {
 
     public NodeReference(int kmerSize) {
         super(kmerSize);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
index 7b46fa5..76b10d0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -4,4 +4,5 @@
 import edu.uci.ics.hyracks.data.std.api.IValueReference;
 
 public class PositionListReference extends PositionListWritable implements  IValueReference {
+
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
index 4c19595..026758a 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -4,4 +4,5 @@
 import edu.uci.ics.hyracks.data.std.api.IValueReference;
 
 public class PositionReference extends PositionWritable implements IValueReference {
+
 }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
index 1bc2137..e91dd66 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -24,27 +24,37 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
 
 public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+    
+    public static final int OutputKmerField = 0;
+    public static final int OutputPosition = 1;
+    
     private KmerBytesWritable kmer;
+    private PositionReference pos;
     private boolean bReversed;
 
+    public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+            null });
+
     public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
         bReversed = bGenerateReversed;
         kmer = new KmerBytesWritable(k);
+        pos = new PositionReference();
     }
 
     @Override
@@ -107,13 +117,13 @@
 
             private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
                 try {
-                    if (posInRead > 127){
-                        throw new IllegalArgumentException ("Position id is beyond 127 at " + readID);
+                    if (posInRead > 127) {
+                        throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
                     }
                     tupleBuilder.reset();
                     tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
-                    tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, readID);
-                    tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, (byte)posInRead);
+                    pos.set(readID, (byte) posInRead);
+                    tupleBuilder.addField(pos.getByteArray(), pos.getStartOffset(), pos.getLength());
 
                     if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                             tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
index 0772eb3..0508ba7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerSequenceWriterFactory.java
@@ -26,6 +26,8 @@
 
 import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,6 +42,9 @@
     private ConfFactory confFactory;
     private final int kmerlength;
 
+    public static final int InputKmerField = 0;
+    public static final int InputPositionListField = 1;
+
     public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
         this.confFactory = new ConfFactory(conf);
         this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
@@ -54,6 +59,7 @@
         Writer writer = null;
 
         KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+        PositionListWritable plist = new PositionListWritable();
 
         /**
          * assumption is that output never change source!
@@ -61,18 +67,18 @@
         @Override
         public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
             try {
-                byte[] kmer = tuple.getFieldData(0);
-                int keyStart = tuple.getFieldStart(0);
-                int keyLength = tuple.getFieldLength(0);
-                if (reEnterKey.getLength() > keyLength){
+                if (reEnterKey.getLength() > tuple.getFieldLength(InputKmerField)) {
                     throw new IllegalArgumentException("Not enough kmer bytes");
                 }
+                reEnterKey.setNewReference(tuple.getFieldData(InputKmerField), tuple.getFieldStart(InputKmerField));
+                int countOfPos = tuple.getFieldLength(InputPositionListField);
+                if (countOfPos % PositionWritable.LENGTH != 0) {
+                    throw new IllegalArgumentException("Invalid count of position byte");
+                }
+                plist.setNewReference(countOfPos, tuple.getFieldData(InputPositionListField),
+                        tuple.getFieldStart(InputPositionListField));
 
-                byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
-                byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
-//                reEnterCount.set(bitmap, count);
-                reEnterKey.set(kmer, keyStart);
-                writer.append(reEnterKey, null);
+                writer.append(reEnterKey, plist);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -82,7 +88,7 @@
         public void open(DataOutput output) throws HyracksDataException {
             try {
                 writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
-                        null, CompressionType.NONE, null);
+                        PositionListWritable.class, CompressionType.NONE, null);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
index eac0045..664a2d2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/KMerTextWriterFactory.java
@@ -19,6 +19,8 @@
 import java.io.IOException;
 
 import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -32,21 +34,31 @@
 	 */
     private static final long serialVersionUID = 1L;
     private KmerBytesWritable kmer;
+    private PositionListWritable plist;
 
     public KMerTextWriterFactory(int k) {
         kmer = new KmerBytesWritable(k);
+        plist = new PositionListWritable();
     }
 
     public class TupleWriter implements ITupleWriter {
         @Override
         public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
             try {
-                kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0));
+                if (kmer.getLength() > tuple.getFieldLength(KMerSequenceWriterFactory.InputKmerField)) {
+                    throw new IllegalArgumentException("Not enough kmer bytes");
+                }
+                kmer.setNewReference(tuple.getFieldData(KMerSequenceWriterFactory.InputKmerField), tuple.getFieldStart(KMerSequenceWriterFactory.InputKmerField));
+                int countOfPos = tuple.getFieldLength(KMerSequenceWriterFactory.InputPositionListField);
+                if (countOfPos % PositionWritable.LENGTH != 0) {
+                    throw new IllegalArgumentException("Invalid count of position byte");
+                }
+                plist.setNewReference(countOfPos, tuple.getFieldData(KMerSequenceWriterFactory.InputPositionListField),
+                        tuple.getFieldStart(KMerSequenceWriterFactory.InputPositionListField));
+
                 output.write(kmer.toString().getBytes());
                 output.writeByte('\t');
-//                output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
-                output.writeByte('\t');
-                output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
+                output.write(plist.toString().getBytes());
                 output.writeByte('\n');
             } catch (IOException e) {
                 throw new HyracksDataException(e);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
new file mode 100644
index 0000000..32e7b4d
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/MapperKmerToReadIDTextWriterFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class MapperKmerToReadIDTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public MapperKmerToReadIDTextWriterFactory(int kmerSize) {
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
index 272a21c..7de3b54 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/NodeSequenceWriterFactory.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 
+@SuppressWarnings("deprecation")
 public class NodeSequenceWriterFactory implements ITupleWriterFactory {
 
     /**
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
new file mode 100644
index 0000000..28b8484
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/io/ReadIDAggregationTextWriterFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.genomix.hyracks.dataflow.io;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class ReadIDAggregationTextWriterFactory implements ITupleWriterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public ReadIDAggregationTextWriterFactory(int kmerSize) {
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index d958205..68a83ac 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -27,6 +27,10 @@
 import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.hyracks.job.JobGen;
 import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
+import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
+import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
+import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -38,9 +42,11 @@
 
 public class Driver {
     public static enum Plan {
+        CHECK_KMERREADER,
+        OUTPUT_KMERHASHTABLE,
+        OUTPUT_MAP_KMER_TO_READ,
+        OUTPUT_GROUPBY_READID,
         BUILD_DEBRUJIN_GRAPH,
-        GRAPH_CLEANNING,
-        CONTIGS_GENERATION,
     }
 
     private static final String IS_PROFILING = "genomix.driver.profiling";
@@ -91,10 +97,18 @@
                 default:
                     jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
                     break;
+                case OUTPUT_KMERHASHTABLE:
+                    jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
+                case OUTPUT_MAP_KMER_TO_READ:
+                    jobGen = new JobGenMapKmerToRead(job,scheduler, ncMap, numPartitionPerMachine);
+                case OUTPUT_GROUPBY_READID:
+                    jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
+                case CHECK_KMERREADER:
+                    jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
             }
 
             start = System.currentTimeMillis();
-            runCreate(jobGen);
+            run(jobGen);
             end = System.currentTimeMillis();
             time = end - start;
             LOG.info("result writing finished " + time + "ms");
@@ -104,7 +118,7 @@
         }
     }
 
-    private void runCreate(JobGen jobGen) throws Exception {
+    private void run(JobGen jobGen) throws Exception {
         try {
             JobSpecification createJob = jobGen.generateJob();
             execute(createJob);
@@ -134,6 +148,7 @@
         boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
         // FileInputFormat.setInputPaths(job, otherArgs[2]);
         {
+            @SuppressWarnings("deprecation")
             Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
             jobConf.set("mapred.input.dir", path.toString());
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
index 3563f93..d17d9ef 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -15,26 +15,31 @@
 
 package edu.uci.ics.genomix.hyracks.job;
 
+import java.io.Serializable;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 
-public abstract class JobGen {
+public abstract class JobGen implements Serializable{
 
-    protected final Configuration conf;
-    protected final GenomixJob genomixJob;
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    protected final ConfFactory confFactory;
     protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
 
-    public JobGen(GenomixJob job) {
-        this.conf = job;
-        this.genomixJob = job;
+    public JobGen(GenomixJob job) throws HyracksDataException {
+        this.confFactory = new ConfFactory(job);
         this.initJobConfiguration();
     }
 
-    protected abstract void initJobConfiguration();
+    protected abstract void initJobConfiguration()throws HyracksDataException ;
 
     public abstract JobSpecification generateJob() throws HyracksException;
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 117b7e0..4adddb6 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -19,6 +19,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -66,11 +67,17 @@
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenBrujinGraph extends JobGen {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
     public enum GroupbyType {
         EXTERNAL,
         PRECLUSTER,
@@ -82,26 +89,26 @@
         BINARY,
     }
 
-    JobConf job;
-    private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
-    private Scheduler scheduler;
-    private String[] ncNodeNames;
+    protected ConfFactory hadoopJobConfFactory;
+    protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    protected Scheduler scheduler;
+    protected String[] ncNodeNames;
 
-    private int readLength;
-    private int kmerSize;
-    private int frameLimits;
-    private int frameSize;
-    private int tableSize;
-    private GroupbyType groupbyType;
-    private OutputFormat outputFormat;
-    private boolean bGenerateReversedKmer;
+    protected int readLength;
+    protected int kmerSize;
+    protected int frameLimits;
+    protected int frameSize;
+    protected int tableSize;
+    protected GroupbyType groupbyType;
+    protected OutputFormat outputFormat;
+    protected boolean bGenerateReversedKmer;
 
-    private void logDebug(String status) {
+    protected void logDebug(String status) {
         LOG.debug(status + " nc nodes:" + ncNodeNames.length);
     }
 
     public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
-            int numPartitionPerMachine) {
+            int numPartitionPerMachine) throws HyracksDataException {
         super(job);
         this.scheduler = scheduler;
         String[] nodes = new String[ncMap.size()];
@@ -150,45 +157,41 @@
                 obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
                         new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
                         finalRec);
+                jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
                 break;
         }
         return obj;
     }
 
-    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec, RecordDescriptor outRec)
-            throws HyracksDataException {
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
         try {
 
-            InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
 
             LOG.info("HDFS read into " + splits.length + " splits");
             String[] readSchedule = scheduler.getLocationConstraints(splits);
-            return new HDFSReadOperatorDescriptor(jobSpec, outRec, job, splits, readSchedule,
-                    new ReadsKeyValueParserFactory(kmerSize, bGenerateReversedKmer));
+            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(kmerSize,
+                            bGenerateReversedKmer));
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
-    private void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
             IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
         jobSpec.connect(conn, preOp, 0, nextOp, 0);
     }
 
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
+    public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readOperator) throws HyracksDataException {
 
-        JobSpecification jobSpec = new JobSpecification();
-        RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null });
         RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         jobSpec.setFrameSize(frameSize);
 
-        // File input
-        logDebug("ReadKmer Operator");
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec, readKmerOutputRec);
-
         Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateKmerAggregateFactory(),
                 new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
                 new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
@@ -201,21 +204,27 @@
         IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
         AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
         connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+        return kmerCrossAggregator;
+    }
 
-        logDebug("Map Kmer to Read Operator");
+    public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) {
         //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,{OtherPosition,...},Kmer) 
         RecordDescriptor readIDOutputRec = new RecordDescriptor(
                 new ISerializerDeserializer[] { null, null, null, null });
         AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, readIDOutputRec);
         connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
+        return mapKmerToRead;
+    }
 
-        logDebug("Group by Read Operator");
+    public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
         // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...} 
         RecordDescriptor readIDCombineRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
         RecordDescriptor readIDFinalRec = new RecordDescriptor(
                 new ISerializerDeserializer[MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
-        objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
+        Object[] objs = generateAggeragateDescriptorbyType(jobSpec, new AggregateReadIDAggregateFactory(),
                 new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(),
                 new ReadIDNormarlizedComputeFactory(), IntegerPointable.FACTORY, readIDCombineRec, readIDFinalRec);
         AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
@@ -226,48 +235,96 @@
         IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
         AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
         connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+        return readCrossAggregator;
+    }
 
-        logDebug("Map ReadInfo to Node");
+    public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) {
         //Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList, OutgoingList, Kmer)
         RecordDescriptor nodeRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null });
         AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec, nodeRec, kmerSize);
         connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
+        return mapEachReadToNode;
+    }
 
+    public AbstractOperatorDescriptor generateRootByWriteKmerGroupbyResult(JobSpecification jobSpec,
+            AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
         // Output Kmer
         ITupleWriterFactory kmerWriter = null;
-        ITupleWriterFactory nodeWriter = null;
         switch (outputFormat) {
             case TEXT:
                 kmerWriter = new KMerTextWriterFactory(kmerSize);
+                break;
+            case BINARY:
+            default:
+                kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+                break;
+        }
+        logDebug("WriteOperator");
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), kmerWriter);
+        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+    public AbstractOperatorDescriptor generateRootByWriteNodeResult(JobSpecification jobSpec,
+            AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+        ITupleWriterFactory nodeWriter = null;
+        switch (outputFormat) {
+            case TEXT:
                 nodeWriter = new NodeTextWriterFactory(kmerSize);
                 break;
             case BINARY:
             default:
-                kmerWriter = new KMerSequenceWriterFactory(job);
-                nodeWriter = new NodeSequenceWriterFactory(job);
+                nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
                 break;
         }
         logDebug("WriteOperator");
-        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, kmerWriter);
-        connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
-                new OneToOneConnectorDescriptor(jobSpec));
-        jobSpec.addRoot(writeKmerOperator);
-
         // Output Node
-        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, nodeWriter);
+        HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), nodeWriter);
         connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
                 new OneToOneConnectorDescriptor(jobSpec));
         jobSpec.addRoot(writeNodeOperator);
-
-        if (groupbyType == GroupbyType.PRECLUSTER) {
-            jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
-        }
-        return jobSpec;
+        return writeNodeOperator;
     }
 
     @Override
-    protected void initJobConfiguration() {
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+        logDebug("Map Kmer to Read Operator");
+        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+        logDebug("Group by Read Operator");
+        lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+        logDebug("Map ReadInfo to Node");
+        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+
+        logDebug("Write node to result");
+        generateRootByWriteNodeResult(jobSpec, lastOperator);
+
+        return jobSpec;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void initJobConfiguration() throws HyracksDataException {
+        Configuration conf = confFactory.getConf();
         readLength = conf.getInt(GenomixJob.READ_LENGTH, GenomixJob.DEFAULT_READLEN);
         kmerSize = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
         if (kmerSize % 2 == 0) {
@@ -295,7 +352,7 @@
         } else {
             outputFormat = OutputFormat.BINARY;
         }
-        job = new JobConf(conf);
+        hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
         LOG.info("Genomix Graph Build Configuration");
         LOG.info("Kmer:" + kmerSize);
         LOG.info("Groupby type:" + type);
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
new file mode 100644
index 0000000..a18c280
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenCheckReader(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerReader(jobSpec, readOperator);
+
+        return jobSpec;
+    }
+
+    public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+            HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+        // Output Kmer
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), new ITupleWriterFactory(){
+
+            /**
+             * 
+             */
+            private static final long serialVersionUID = 1L;
+            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+            private PositionWritable pos = new PositionWritable();
+
+            @Override
+            public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                return new ITupleWriter(){
+
+                    @Override
+                    public void open(DataOutput output) throws HyracksDataException {
+                        // TODO Auto-generated method stub
+                        
+                    }
+
+                    @Override
+                    public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                        try {
+                            if (kmer.getLength() > tuple.getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                throw new IllegalArgumentException("Not enough kmer bytes");
+                            }
+                            kmer.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField), tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+                            pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
+                                    tuple.getFieldStart(ReadsKeyValueParserFactory.OutputPosition));
+
+                            output.write(kmer.toString().getBytes());
+                            output.writeByte('\t');
+                            output.write(pos.toString().getBytes());
+                            output.writeByte('\n');
+                        } catch (IOException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+
+                    @Override
+                    public void close(DataOutput output) throws HyracksDataException {
+                        // TODO Auto-generated method stub
+                        
+                    }
+                    
+                };
+            }
+            
+        });
+        connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
new file mode 100644
index 0000000..2c1a70a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCreateKmerInfo.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCreateKmerInfo extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenCreateKmerInfo(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+        return jobSpec;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
new file mode 100644
index 0000000..7649100
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenGroupbyReadID.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.io.ReadIDAggregationTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenGroupbyReadID extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenGroupbyReadID(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+        logDebug("Map Kmer to Read Operator");
+        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+        logDebug("Group by Read Operator");
+        lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+        logDebug("Map ReadInfo to Node");
+        lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+
+        logDebug("Write node to result");
+        generateRootByWriteReadIDAggregationResult(jobSpec, lastOperator);
+
+        return jobSpec;
+    }
+
+    public AbstractOperatorDescriptor generateRootByWriteReadIDAggregationResult(JobSpecification jobSpec,
+            AbstractOperatorDescriptor readCrossAggregator) throws HyracksException {
+        ITupleWriterFactory readWriter = new ReadIDAggregationTextWriterFactory(kmerSize);
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), readWriter);
+        connectOperators(jobSpec, readCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
new file mode 100644
index 0000000..76499e6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenMapKmerToRead.java
@@ -0,0 +1,57 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.io.MapperKmerToReadIDTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenMapKmerToRead extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenMapKmerToRead(GenomixJob job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+        // TODO Auto-generated constructor stub
+    }
+    
+    public AbstractOperatorDescriptor generateRootByWriteMapperFromKmerToReadID(JobSpecification jobSpec, AbstractOperatorDescriptor mapper) throws HyracksException{
+        // Output Kmer
+        ITupleWriterFactory kmerWriter = new MapperKmerToReadIDTextWriterFactory(kmerSize);
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec, hadoopJobConfFactory.getConf(), kmerWriter);
+        connectOperators(jobSpec, mapper, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Group by Kmer");
+        AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+        logDebug("Map Kmer to Read Operator");
+        lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+        generateRootByWriteMapperFromKmerToReadID(jobSpec, lastOperator);
+        
+        return jobSpec;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
deleted file mode 100644
index adce3f6..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.hyracks.job;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
-import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
-import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
-
-public class JobGenStatistic extends JobGen {
-    private int kmers;
-    private JobConf hadoopjob;
-    private RecordDescriptor readOutputRec;
-    private String[] ncNodeNames;
-    private Scheduler scheduler;
-    private RecordDescriptor combineOutputRec;
-
-    public JobGenStatistic(GenomixJob job) {
-        super(job);
-        // TODO Auto-generated constructor stub
-    }
-
-    @Override
-    protected void initJobConfiguration() {
-        // TODO Auto-generated method stub
-        kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMERLEN);
-        hadoopjob = new JobConf(conf);
-        hadoopjob.setInputFormat(SequenceFileInputFormat.class);
-    }
-
-    @Override
-    public JobSpecification generateJob() throws HyracksException {
-        int[] degreeFields = { 0, 1 }; // indegree, outdegree
-        int[] countFields = { 2 };
-        JobSpecification jobSpec = new JobSpecification();
-        /** specify the record fields after read */
-        readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
-                ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
-        combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
-                ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-        /** the reader */
-        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
-
-        /** the combiner aggregator */
-        AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
-        AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
-
-        /** the final aggregator */
-        AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
-        AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
-
-        /** writer */
-        AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
-        AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
-        jobSpec.addRoot(writeDegree);
-        jobSpec.addRoot(writeCount);
-        return jobSpec;
-    }
-
-    private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
-        try {
-
-            InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
-
-            String[] readSchedule = scheduler.getLocationConstraints(splits);
-            return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
-                    null); //new StatReadsKeyValueParserFactory());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
-            IAggregatorDescriptorFactory aggeragater) {
-        return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
-                new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
-                aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                                new ByteComparatorFactory(), new ByteComparatorFactory() }),
-                        GenomixJob.DEFAULT_TABLE_SIZE), true);
-    }
-
-    private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
-            HDFSReadOperatorDescriptor readOperator) {
-        AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
-                new StatCountAggregateFactory());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
-        return localAggregator;
-    }
-
-    private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
-            AbstractOperatorDescriptor localAggregator) {
-        AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
-        // only need one reducer
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
-                % ncNodeNames.length]);
-        IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
-                new ITuplePartitionComputerFactory() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public ITuplePartitionComputer createPartitioner() {
-                        return new ITuplePartitionComputer() {
-                            @Override
-                            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
-                                    throws HyracksDataException {
-                                return 0;
-                            }
-                        };
-                    }
-                }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
-        jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
-        return finalAggregator;
-    }
-
-    private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
-            AbstractOperatorDescriptor finalAggregator) {
-        LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
-                % ncNodeNames.length]);
-
-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
-        jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
-        return writeOperator;
-    }
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
similarity index 99%
rename from genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
rename to genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
index 4a7c8aa..ae2838d 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobKmerGroupbyTest.java
@@ -43,7 +43,7 @@
 import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 
-public class JobRunTest {
+public class JobKmerGroupbyTest {
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index d41bcbe..fca0607 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -7,6 +7,8 @@
 import java.io.FileWriter;
 import java.io.IOException;
 
+import junit.framework.Assert;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -19,23 +21,28 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
 import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
 import edu.uci.ics.genomix.hyracks.job.GenomixJob;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 
 public class JobRunStepByStepTest {
+    private static final int KmerSize = 5;
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-    private static final String DATA_PATH = "src/test/resources/data/webmap/text.txt";
+    private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
     private static final String HDFS_INPUT_PATH = "/webmap";
     private static final String HDFS_OUTPUT_PATH = "/webmap_result";
+    
+    private static final String EXPECTED_DIR = "src/test/resources/expected/";
+    private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
 
     private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
     private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
-    private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
-    private static final String EXPECTED_REVERSE_PATH = "src/test/resources/expected/result_reverse";
     private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
     private MiniDFSCluster dfsCluster;
 
@@ -44,6 +51,18 @@
     private int numPartitionPerMachine = 1;
 
     private Driver driver;
+    
+    @Test
+    public void TestAll() throws Exception {
+        TestReader();
+    }
+    
+    public void TestReader() throws Exception{
+        conf.set(GenomixJob.GROUPBY_TYPE, "external");
+        System.err.println("Testing ExternalGroupBy");
+        driver.runJob(new GenomixJob(conf), Plan.CHECK_KMERREADER, true);
+        Assert.assertEquals(true, checkResults(EXPECTED_READER_RESULT));
+    }
 
     @Before
     public void setUp() throws Exception {
@@ -56,7 +75,7 @@
         FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
         FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
 
-        conf.setInt(GenomixJob.KMER_LENGTH, 5);
+        conf.setInt(GenomixJob.KMER_LENGTH, KmerSize);
         driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
                 edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
     }
@@ -77,7 +96,7 @@
         System.setProperty("hadoop.log.dir", "logs");
         dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
         FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(DATA_PATH);
+        Path src = new Path(DATA_INPUT_PATH);
         Path dest = new Path(HDFS_INPUT_PATH);
         dfs.mkdirs(dest);
         //dfs.mkdirs(result);
diff --git a/genomix/genomix-hyracks/src/test/resources/data/0/text.txt b/genomix/genomix-hyracks/src/test/resources/data/0/text.txt
deleted file mode 100755
index f63a141..0000000
--- a/genomix/genomix-hyracks/src/test/resources/data/0/text.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-@625E1AAXX100810:1:100:10000:10271/1

-AATAGAAG

-+

-EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?

diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index 9874fc2..08f0f95 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -1,8 +1,5 @@
-@625E1AAXX100810:1:100:10000:10271/1

-AATAGAAG

-AATAGAAG

-+

-EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?

-AATAGAAG

-AATAGAAG

-AATAGAAG
\ No newline at end of file
+1	AATAGAAG

+2	AATAGAAG

+3	AATAGAAG

+4	AATAGAAG

+5	AATAGAAG
\ No newline at end of file