add the read checker for hyracks
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
index c903645..5409f34 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/driver/Driver.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.genomix.hyracks.newgraph.job.GenomixJobConf;
 import edu.uci.ics.genomix.hyracks.newgraph.job.JobGen;
 import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.newgraph.job.JobGenCheckReader;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +41,7 @@
 public class Driver {
     public static enum Plan {
         BUILD_DEBRUJIN_GRAPH,
+        CHECK_KMERREADER,
     }
 
     private static final String IS_PROFILING = "genomix.driver.profiling";
@@ -90,6 +92,9 @@
                 default:
                     jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
                     break;
+                case CHECK_KMERREADER:
+                    jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+                    break;
             }
 
             start = System.currentTimeMillis();
@@ -115,8 +120,9 @@
 
     private void execute(JobSpecification job) throws Exception {
         job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc
-                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        if(profiling)
+            EnumSet.of(JobFlag.PROFILE_RUNTIME);
+        JobId jobId = hcc.startJob(job, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         hcc.waitForCompletion(jobId);
     }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index 1a95ac2..afc1cf7 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -209,13 +209,10 @@
         logDebug("Group by Kmer");
         AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
 
-        // logDebug("Write kmer to result");
-        // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
-
         logDebug("Write node to result");
         lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
 
-        jobSpec.addRoot(lastOperator);
+        jobSpec.addRoot(readOperator);//what's this? why we need this? why I can't seet it in the JobGenCheckReader
         return jobSpec;
     }
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
new file mode 100644
index 0000000..f512f43
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCheckReader extends JobGenBrujinGraph {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public JobGenCheckReader(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+            int numPartitionPerMachine) throws HyracksDataException {
+        super(job, scheduler, ncMap, numPartitionPerMachine);
+    }
+
+    @Override
+    public JobSpecification generateJob() throws HyracksException {
+
+        JobSpecification jobSpec = new JobSpecification();
+        logDebug("ReadKmer Operator");
+        HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+        logDebug("Write kmer to result");
+        generateRootByWriteKmerReader(jobSpec, readOperator);
+
+        return jobSpec;
+    }
+
+    public AbstractSingleActivityOperatorDescriptor generateRootByWriteKmerReader(JobSpecification jobSpec,
+            HDFSReadOperatorDescriptor readOperator) throws HyracksException {
+        
+        HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+                hadoopJobConfFactory.getConf(), new ITupleWriterFactory() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+                        KmerBytesWritable.setGlobalKmerLength(kmerSize);
+                        return new ITupleWriter() {
+
+                            private NodeWritable outputNode = new NodeWritable();
+                            private KmerBytesWritable outputKmer = new KmerBytesWritable();
+
+                            @Override
+                            public void open(DataOutput output) throws HyracksDataException {
+                            }
+
+                            @Override
+                            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+                                try {
+                                    if (outputKmer.getLength() > tuple
+                                            .getFieldLength(ReadsKeyValueParserFactory.OutputKmerField)) {
+                                        throw new IllegalArgumentException("Not enough kmer bytes");
+                                    }
+                                    outputKmer.setAsReference(
+                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputKmerField),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputKmerField));
+                                    outputNode.setAsReference(
+                                            tuple.getFieldData(ReadsKeyValueParserFactory.OutputNodeField),
+                                            tuple.getFieldStart(ReadsKeyValueParserFactory.OutputNodeField));
+
+                                    output.write(outputKmer.toString().getBytes());
+                                    output.writeByte('\t');
+                                    output.write(outputNode.toString().getBytes());
+                                    output.writeByte('\n');
+                                } catch (IOException e) {
+                                    throw new HyracksDataException(e);
+                                }
+                            }
+
+                            @Override
+                            public void close(DataOutput output) throws HyracksDataException {
+
+                            }
+
+                        };
+                    }
+
+                });
+        connectOperators(jobSpec, readOperator, ncNodeNames, writeKmerOperator, ncNodeNames,
+                new OneToOneConnectorDescriptor(jobSpec));
+        jobSpec.addRoot(writeKmerOperator);
+        return writeKmerOperator;
+    }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
index bb0d131..3cd5bb9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/newgraph/test/JobRun.java
@@ -52,7 +52,15 @@
     
     @Test
     public void TestAll() throws Exception {
-        TestGroupby();
+        TestReader();
+//        TestGroupby();
+    }
+    
+    public void TestReader() throws Exception {
+        cleanUpReEntry();
+        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_TEXT);
+        driver.runJob(new GenomixJobConf(conf), Plan.CHECK_KMERREADER, true);
+        dumpResult();
     }
     
     public void TestGroupby() throws Exception {
@@ -60,7 +68,7 @@
         cleanUpReEntry();
         conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
         driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        dumpResult();
+//        dumpResult();
     }
     
     @Before
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index d446f39..51a0d15 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -76,10 +76,10 @@
     @Test
     public void TestAll() throws Exception {
 //        TestReader();
-        TestGroupbyKmer();
+//        TestGroupbyKmer();
 //        TestMapKmerToRead();
 //        TestGroupByReadID();
-//        TestEndToEnd();
+        TestEndToEnd();
 //        TestUnMergedNode();
     }