new graph driver
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 25eeadb..a9c1183 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -39,7 +39,7 @@
     private static final long serialVersionUID = 1L;
     private static final byte[] EMPTY_BYTES = {};
 
-    protected int size;
+    public int size;
     protected byte[] bytes;
     protected int offset;
     protected int kmerlength;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
new file mode 100644
index 0000000..c4a6865
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class KmerListWritable implements Writable, Iterable<KmerBytesWritable>, Serializable{
+    private static final long serialVersionUID = 1L;
+    protected byte[] storage;
+    protected int offset;
+    protected int valueCount;
+    public int KMER_LENGTH = 3;
+    protected static final byte[] EMPTY = {};
+    
+    protected KmerBytesWritable posIter = new KmerBytesWritable();
+    
+    public KmerListWritable() {
+        this.storage = EMPTY;
+        this.valueCount = 0;
+        this.offset = 0;
+    }
+    
+    public KmerListWritable(int kmerLength) {
+        this();
+        this.KMER_LENGTH = kmerLength;
+    }
+    
+    public KmerListWritable(int count, byte[] data, int offset) {
+        setNewReference(count, data, offset);
+    }
+    
+    public KmerListWritable(List<KmerBytesWritable> kmers) {
+        this();
+        setSize(kmers.size());  // reserve space for all elements
+        for (KmerBytesWritable kmer : kmers) {
+            append(kmer);
+        }
+    }
+    
+    public void setNewReference(int count, byte[] data, int offset) {
+        this.valueCount = count;
+        this.storage = data;
+        this.offset = offset;
+    }
+    
+    public void append(KmerBytesWritable kmer){
+        setSize((1 + valueCount) * kmer.getLength());
+        System.arraycopy(kmer.getBytes(), 0, storage, offset, KMER_LENGTH);
+        valueCount += 1;
+    }
+    
+    protected void setSize(int size) {
+        if (size > getCapacity()) {
+            setCapacity((size * 3 / 2));
+        }
+    }
+    
+    protected int getCapacity() {
+        return storage.length - offset;
+    }
+
+    protected void setCapacity(int new_cap) {
+        if (new_cap > getCapacity()) {
+            byte[] new_data = new byte[new_cap];
+            if (storage.length - offset > 0) {
+                System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
+            }
+            storage = new_data;
+            offset = 0;
+        }
+    }
+    
+    public void reset() {
+        valueCount = 0;
+    }
+    
+    public KmerBytesWritable getPosition(int i) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        posIter.setNewReference(storage, offset + i * KMER_LENGTH);
+        return posIter;
+    }
+    
+    @Override
+    public Iterator<KmerBytesWritable> iterator() {
+        Iterator<KmerBytesWritable> it = new Iterator<KmerBytesWritable>() {
+
+            private int currentIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return currentIndex < valueCount;
+            }
+
+            @Override
+            public KmerBytesWritable next() {
+                return getPosition(currentIndex++);
+            }
+
+            @Override
+            public void remove() {
+                if(currentIndex < valueCount)
+                    System.arraycopy(storage, offset + currentIndex * KMER_LENGTH, 
+                          storage, offset + (currentIndex - 1) * KMER_LENGTH, 
+                          (valueCount - currentIndex) * KMER_LENGTH);
+                valueCount--;
+                currentIndex--;
+            }
+        };
+        return it;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.valueCount = in.readInt();
+        setSize(valueCount * PositionWritable.LENGTH);
+        in.readFully(storage, offset, valueCount * PositionWritable.LENGTH);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(valueCount);
+        out.write(storage, offset, valueCount * PositionWritable.LENGTH);
+    }
+    
+    public int getCountOfPosition() {
+        return valueCount;
+    }
+
+    public byte[] getByteArray() {
+        return storage;
+    }
+
+    public int getStartOffset() {
+        return offset;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
index c3ec3c7..8f728cd 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenCheckReader.java
@@ -18,7 +18,7 @@
 import java.io.IOException;
 import java.util.Map;
 
-import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.oldtype.PositionWritable;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..ae511e8
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+
+    public static final int OutputKmerField = 0;
+    public static final int OutputPosition = 1;
+    public static final int OutputKmerListField = 2;
+
+    private final boolean bReversed;
+    private final int readLength;
+    private final int kmerSize;
+
+    public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+            null });
+
+    public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
+        bReversed = bGenerateReversed;
+        this.readLength = readlength;
+        this.kmerSize = k;
+    }
+
+    @Override
+    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputBuffer, true);
+
+        return new IKeyValueParser<LongWritable, Text>() {
+
+            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+            private PositionReference pos = new PositionReference();
+
+            @Override
+            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+                String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+                if (geneLine.length != 2) {
+                    return;
+                }
+                int readID = 0;
+                try {
+                    readID = Integer.parseInt(geneLine[0]);
+                } catch (NumberFormatException e) {
+                    LOG.warn("Invalid data ");
+                    return;
+                }
+
+                Pattern genePattern = Pattern.compile("[AGCT]+");
+                Matcher geneMatcher = genePattern.matcher(geneLine[1]);
+                boolean isValid = geneMatcher.matches();
+                if (isValid) {
+                    if (geneLine[1].length() != readLength) {
+                        LOG.warn("Invalid readlength at: " + readID);
+                        return;
+                    }
+                    SplitReads(readID, geneLine[1].getBytes(), writer);
+                }
+            }
+
+            private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
+                /** first kmer */
+                if (kmerSize >= array.length) {
+                    return;
+                }
+                kmer.setByRead(array, 0);
+                InsertToFrame(kmer, readID, 1, writer);
+
+                /** middle kmer */
+                for (int i = kmerSize; i < array.length; i++) {
+                    kmer.shiftKmerWithNextChar(array[i]);
+                    InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
+                }
+
+                if (bReversed) {
+                    /** first kmer */
+                    kmer.setByReadReverse(array, 0);
+                    InsertToFrame(kmer, readID, -1, writer);
+                    /** middle kmer */
+                    for (int i = kmerSize; i < array.length; i++) {
+                        kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
+                        InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
+                    }
+                }
+            }
+
+            private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
+                try {
+                    if (Math.abs(posInRead) > 127) {
+                        throw new IllegalArgumentException("Position id is beyond 127 at " + readID);
+                    }
+                    tupleBuilder.reset();
+                    tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
+                    pos.set(readID, (byte) posInRead);
+                    tupleBuilder.addField(pos.getByteArray(), pos.getStartOffset(), pos.getLength());
+
+                    if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                            tupleBuilder.getSize())) {
+                        FrameUtils.flushFrame(outputBuffer, writer);
+                        outputAppender.reset(outputBuffer, true);
+                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                                tupleBuilder.getSize())) {
+                            throw new IllegalStateException(
+                                    "Failed to copy an record into a frame: the record size is too large.");
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+
+            @Override
+            public void open(IFrameWriter writer) throws HyracksDataException {
+            }
+
+            @Override
+            public void close(IFrameWriter writer) throws HyracksDataException {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+        };
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
new file mode 100644
index 0000000..64d359d
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
@@ -0,0 +1,109 @@
+package edu.uci.ics.genomix.hyracks.newgraph.driver;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
+import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
+import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
+import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
+import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class Driver {
+    public static enum Plan {
+        CHECK_KMERREADER,
+    }
+    
+    private static final String IS_PROFILING = "genomix.driver.profiling";
+    private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+    private static final Log LOG = LogFactory.getLog(Driver.class);
+    private JobGen jobGen;
+    private boolean profiling;
+    private int numPartitionPerMachine;
+
+    private IHyracksClientConnection hcc;
+    private Scheduler scheduler;
+    
+    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+        try {
+            hcc = new HyracksConnection(ipAddress, port);
+            scheduler = new Scheduler(hcc.getNodeControllerInfos());
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+        this.numPartitionPerMachine = numPartitionPerMachine;
+    }
+    
+    public void runJob(GenomixJobConf job) throws HyracksException {
+        runJob(job, Plan.CHECK_KMERREADER, false);
+    }
+
+    public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
+        /** add hadoop configurations */
+        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+        job.addResource(hadoopCore);
+        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+        job.addResource(hadoopMapRed);
+        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+        job.addResource(hadoopHdfs);
+
+        LOG.info("job started");
+        long start = System.currentTimeMillis();
+        long end = start;
+        long time = 0;
+
+        this.profiling = profiling;
+        try {
+            Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+            LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+            switch (planChoice) {
+                case CHECK_KMERREADER:
+                default:
+                    jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
+            }
+
+            start = System.currentTimeMillis();
+            run(jobGen);
+            end = System.currentTimeMillis();
+            time = end - start;
+            LOG.info("result writing finished " + time + "ms");
+            LOG.info("job finished");
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+    
+    private void run(JobGen jobGen) throws Exception {
+        try {
+            JobSpecification createJob = jobGen.generateJob();
+            execute(createJob);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+    
+    private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        hcc.waitForCompletion(jobId);
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..eadf046
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+@SuppressWarnings("deprecation")
+public class JobGenBrujinGraph extends JobGen {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public enum GroupbyType {
+        EXTERNAL,
+        PRECLUSTER,
+        HYBRIDHASH,
+    }
+
+    public enum OutputFormat {
+        TEXT,
+        BINARY,
+    }
+
+    protected ConfFactory hadoopJobConfFactory;
+    protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+    protected String[] ncNodeNames;
+    protected String[] readSchedule;
+
+    protected int readLength;
+    protected int kmerSize;
+    protected int frameLimits;
+    protected int frameSize;
+    protected int tableSize;
+    protected GroupbyType groupbyType;
+    protected OutputFormat outputFormat;
+    protected boolean bGenerateReversedKmer;
+
+    protected void logDebug(String status) {
+        LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+    }
+
+    public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job);
+        String[] nodes = new String[ncMap.size()];
+        ncMap.keySet().toArray(nodes);
+        ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+        for (int i = 0; i < numPartitionPerMachine; i++) {
+            System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+        }
+        initJobConfiguration(scheduler);
+    }
+    
+    protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
+        Configuration conf = confFactory.getConf();
+        readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+        kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
+        if (kmerSize % 2 == 0) {
+            kmerSize--;
+            conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
+        }
+        frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+        tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+        frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
+
+        bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
+
+        String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+        if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
+            groupbyType = GroupbyType.EXTERNAL;
+        } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
+            groupbyType = GroupbyType.PRECLUSTER;
+        } else {
+            groupbyType = GroupbyType.HYBRIDHASH;
+        }
+
+        String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+        if (output.equalsIgnoreCase("text")) {
+            outputFormat = OutputFormat.TEXT;
+        } else {
+            outputFormat = OutputFormat.BINARY;
+        }
+        try {
+            hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+            readSchedule = scheduler.getLocationConstraints(splits);
+        } catch (IOException ex) {
+            throw new HyracksDataException(ex);
+        }
+
+        LOG.info("Genomix Graph Build Configuration");
+        LOG.info("Kmer:" + kmerSize);
+        LOG.info("Groupby type:" + type);
+        LOG.info("Output format:" + output);
+        LOG.info("Frame limit" + frameLimits);
+        LOG.info("Frame size" + frameSize);
+    }
+    
+    public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+        try {
+            InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+                    .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+            return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+                    hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+                            kmerSize, bGenerateReversedKmer));
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+            IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+        jobSpec.connect(conn, preOp, 0, nextOp, 0);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
new file mode 100644
index 0000000..7c8f10e
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.oldtype.PositionWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+    private static final long serialVersionUID = 1L;
+
+    public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerReader(jobSpec, readOperator);
+
+        return jobSpec;
+    }
+
+    public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+            HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+        // Output Kmer
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    /**
+             * 
+             */
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        return new ITupleWriter() {
+
+                            private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+                            private PositionWritable pos = new PositionWritable();
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                try {
+                                    if (kmer.getLength() > tuple
+                                            .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                        throw new IllegalArgumentException("Not enough kmer bytes");
+                                    }
+                                    kmer.setNewReference(
+                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+                                    pos.setNewReference(tuple.getFieldData(ReadsKeyValueParserFactory.OutputPosition),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputPosition));
+
+                                    output.write(kmer.toString().getBytes());
+                                    output.writeByte('\t');
+                                    output.write(pos.toString().getBytes());
+                                    output.writeByte('\n');
+                                } catch (IOException e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                        };
+                    }
+
+                });
+        connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
new file mode 100644
index 0000000..d4798e6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -0,0 +1,87 @@
+package edu.uci.ics.genomix.hyracks.newgraph.test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.After;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver;
+import edu.uci.ics.genomix.hyracks.newgraph.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.test.TestUtils;
+import edu.uci.ics.genomix.oldtype.NodeWritable;
+
+public class JobRun {
+    private static final int KmerSize = 5;
+    private static final int ReadLength = 8;
+    private static final String ACTUAL_RESULT_DIR = "actual";
+    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+
+    private static final String DATA_INPUT_PATH = "src/test/resources/data/webmap/text.txt";
+    private static final String HDFS_INPUT_PATH = "/webmap";
+    private static final String HDFS_OUTPUT_PATH = "/webmap_result";
+
+    private static final String EXPECTED_DIR = "src/test/resources/expected/";
+    private static final String EXPECTED_READER_RESULT = EXPECTED_DIR + "result_after_initial_read";
+    private static final String EXPECTED_OUPUT_KMER = EXPECTED_DIR + "result_after_kmerAggregate";
+    private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
+    private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
+    private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
+    private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
+
+    private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
+    private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";;
+    private MiniDFSCluster dfsCluster;
+    
+    private JobConf conf = new JobConf();
+    private int numberOfNC = 2;
+    private int numPartitionPerMachine = 2;
+    
+    private Driver driver;
+    
+    @Test
+    public void TestAll() throws Exception {
+        TestReader();
+    }
+    
+    public void TestReader() throws Exception {
+        cleanUpReEntry();
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+    }
+    
+    private void cleanUpReEntry() throws IOException {
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        if (lfs.exists(new Path(DUMPED_RESULT))) {
+            lfs.delete(new Path(DUMPED_RESULT), true);
+        }
+        FileSystem dfs = FileSystem.get(conf);
+        if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
+            dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
+        }
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
+        cleanupHDFS();
+    }
+
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
+}