hadoop graph building completion
diff --git a/genomix/genomix-hadoop/data/webmap/text.txt b/genomix/genomix-hadoop/data/webmap/text.txt
index 13190dd..01c49e5 100755
--- a/genomix/genomix-hadoop/data/webmap/text.txt
+++ b/genomix/genomix-hadoop/data/webmap/text.txt
@@ -1,6 +1,6 @@
 1	AATAGAAG

-2	AATAGAAG

+2	AATAGCTT

 3	AATAGAAG

-4	AATAGAAG

+4	AATAGCTT

 5	AATAGAAG

 6	AGAAGAAG

diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
deleted file mode 100644
index ddb2a64..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
-import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-
-@SuppressWarnings("deprecation")
-public class MergePathsH3 extends Configured implements Tool {
-    /*
-     * Flags used when sending messages
-     */
-    public static class MessageFlag {
-        public static final byte EMPTY_MESSAGE = 0;
-        public static final byte FROM_SELF = 1;
-        public static final byte FROM_SUCCESSOR = 1 << 1;
-        public static final byte IS_HEAD = 1 << 2;
-        public static final byte FROM_PREDECESSOR = 1 << 3;
-
-        public static String getFlagAsString(byte code) {
-            // TODO: allow multiple flags to be set
-            switch (code) {
-                case EMPTY_MESSAGE:
-                    return "EMPTY_MESSAGE";
-                case FROM_SELF:
-                    return "FROM_SELF";
-                case FROM_SUCCESSOR:
-                    return "FROM_SUCCESSOR";
-            }
-            return "ERROR_BAD_MESSAGE";
-        }
-    }
-
-    /*
-     * Common functionality for the two mapper types needed.  See javadoc for MergePathsH3MapperSubsequent.
-     */
-    private static class MergePathsH3MapperBase extends MapReduceBase {
-
-        protected static long randSeed;
-        protected Random randGenerator;
-        protected float probBeingRandomHead;
-
-        protected int KMER_SIZE;
-        protected PositionWritable outputKey;
-        protected MessageWritableNodeWithFlag outputValue;
-        protected NodeWritable curNode;
-
-        public void configure(JobConf conf) {
-            randSeed = conf.getLong("randomSeed", 0);
-            randGenerator = new Random(randSeed);
-            probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
-
-            KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            outputKey = new PositionWritable();
-            curNode = new NodeWritable(KMER_SIZE);
-        }
-
-        protected boolean isNodeRandomHead(PositionWritable nodeID) {
-            // "deterministically random", based on node id
-            randGenerator.setSeed(randSeed ^ nodeID.hashCode());
-            return randGenerator.nextFloat() < probBeingRandomHead;
-        }
-    }
-
-    /*
-     * Mapper class: Partition the graph using random pseudoheads.
-     * Heads send themselves to their successors, and all others map themselves.
-     */
-    private static class MergePathsH3MapperSubsequent extends MergePathsH3MapperBase implements
-            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-        @Override
-        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
-            curNode = value.getNode();
-            // Map all path vertices; tail nodes are sent to their predecessors
-            if (curNode.isPathNode()) {
-                boolean isHead = (value.getFlag() & MessageFlag.IS_HEAD) == MessageFlag.IS_HEAD;
-                if (isHead || isNodeRandomHead(curNode.getNodeID())) {
-                    // head nodes send themselves to their successor
-                    outputKey.set(curNode.getOutgoingList().getPosition(0));
-                    outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
-                    output.collect(outputKey, outputValue);
-                } else {
-                    // tail nodes map themselves
-                    outputValue.set(MessageFlag.FROM_SELF, curNode);
-                    output.collect(key, outputValue);
-                }
-            }
-        }
-    }
-
-    /*
-     * Mapper used for the first iteration.  See javadoc for MergePathsH3MapperSubsequent.
-     */
-    private static class MergePathsH3MapperInitial extends MergePathsH3MapperBase implements
-            Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
-        @Override
-        public void map(NodeWritable key, NullWritable value,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
-            curNode = key;
-            // Map all path vertices; tail nodes are sent to their predecessors
-            if (curNode.isPathNode()) {
-                if (isNodeRandomHead(curNode.getNodeID())) {
-                    // head nodes send themselves to their successor
-                    outputKey.set(curNode.getOutgoingList().getPosition(0));
-                    outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
-                    output.collect(outputKey, outputValue);
-                } else {
-                    // tail nodes map themselves
-                    outputValue.set(MessageFlag.FROM_SELF, curNode);
-                    output.collect(key.getNodeID(), outputValue);
-                }
-            }
-        }
-    }
-
-    /*
-     * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
-     */
-    private static class MergePathsH3Reducer extends MapReduceBase implements
-            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
-        private int KMER_SIZE;
-        private MessageWritableNodeWithFlag inputValue;
-        private MessageWritableNodeWithFlag outputValue;
-        private NodeWritable headNode;
-        private NodeWritable tailNode;
-        private int count;
-
-        public void configure(JobConf conf) {
-            KMER_SIZE = conf.getInt("sizeKmer", 0);
-            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
-            headNode = new NodeWritable(KMER_SIZE);
-            tailNode = new NodeWritable(KMER_SIZE);
-        }
-
-        @Override
-        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
-                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
-                throws IOException {
-
-            inputValue = values.next();
-            if (!values.hasNext()) {
-                // all single nodes must be remapped
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
-                    // FROM_SELF => remap self
-                    output.collect(key, inputValue);
-                } else {
-                    // FROM_PREDECESSOR => remap predecessor
-                    output.collect(inputValue.getNode().getNodeID(), inputValue);
-                }
-            } else {
-                // multiple inputs => a merge will take place. Aggregate both, then collect the merged path
-                count = 0;
-                while (true) { // process values; break when no more
-                    count++;
-                    if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
-                        headNode.set(inputValue.getNode());
-                    } else {
-                        tailNode.set(inputValue.getNode());
-                    }
-                    if (!values.hasNext()) {
-                        break;
-                    } else {
-                        inputValue = values.next();
-                    }
-                }
-                if (count != 2) {
-                    throw new IOException("Expected two nodes in MergePathsH3 reduce; saw " + String.valueOf(count));
-                }
-                // merge the head and tail as saved output, this merged node is now a head 
-                headNode.mergeNext(tailNode, KMER_SIZE);
-                outputValue.set(MessageFlag.IS_HEAD, headNode);
-                output.collect(key, outputValue);
-            }
-        }
-    }
-
-    /*
-     * Run one iteration of the mergePaths algorithm
-     */
-    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
-        JobConf conf = new JobConf(baseConf);
-        conf.setJarByClass(MergePathsH3.class);
-        conf.setJobName("MergePathsH3 " + inputPath);
-
-        FileInputFormat.addInputPath(conf, new Path(inputPath));
-        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
-
-        conf.setInputFormat(SequenceFileInputFormat.class);
-
-        conf.setMapOutputKeyClass(PositionWritable.class);
-        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
-        conf.setOutputKeyClass(PositionWritable.class);
-        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
-
-        // on the first iteration, we have to transform from a node-oriented graph 
-        // to a Position-oriented graph
-        if (conf.getInt("iMerge", 1) == 1) {
-            conf.setMapperClass(MergePathsH3MapperInitial.class);
-        } else {
-            conf.setMapperClass(MergePathsH3MapperSubsequent.class);
-        }
-        conf.setReducerClass(MergePathsH3Reducer.class);
-
-        FileSystem.get(conf).delete(new Path(outputPath), true);
-
-        return JobClient.runJob(conf);
-    }
-
-    @Override
-    public int run(String[] arg0) throws Exception {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    public static void main(String[] args) throws Exception {
-        int res = ToolRunner.run(new Configuration(), new MergePathsH3(), args);
-        System.exit(res);
-    }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
deleted file mode 100644
index 4982439..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-@SuppressWarnings("deprecation")
-public class MergePathsH3Driver {
-
-    private static class Options {
-        @Option(name = "-inputpath", usage = "the input path", required = true)
-        public String inputPath;
-
-        @Option(name = "-outputpath", usage = "the output path", required = true)
-        public String outputPath;
-
-        @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
-        public String mergeResultPath;
-
-        @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
-        public int numReducers;
-
-        @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
-        public int sizeKmer;
-
-        @Option(name = "-merge-rounds", usage = "the while rounds of merging", required = true)
-        public int mergeRound;
-
-    }
-
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound)
-            throws IOException {
-        JobConf baseConf = new JobConf(); // I don't know the semantics here.  do i use a base conf file or something?
-        baseConf.setNumReduceTasks(numReducers);
-        baseConf.setInt("sizeKmer", sizeKmer);
-
-        FileSystem dfs = FileSystem.get(baseConf);
-        String prevOutput = inputPath;
-        dfs.delete(new Path(outputPath), true); // clear any previous output
-
-        String tmpOutputPath = "NO_JOBS_DONE";
-        for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
-            baseConf.setInt("iMerge", iMerge);
-            MergePathsH3 merger = new MergePathsH3();
-            tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
-            merger.run(prevOutput, tmpOutputPath, baseConf);
-        }
-        dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
-    }
-
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        parser.parseArgument(args);
-        MergePathsH3Driver driver = new MergePathsH3Driver();
-        driver.run(options.inputPath, options.outputPath, options.numReducers, 
-                options.sizeKmer, options.mergeRound);
-    }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
index 013dd72..e02b4f3 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -23,7 +23,6 @@
     private static int LAST_POSID;
     private static int KMER_SIZE;
     private static int READ_LENGTH;
-
     @Override
     public void configure(JobConf job) {
         KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
@@ -71,13 +70,15 @@
             positionEntry.set(outputListInRead.getPosition(i));
             tempPosList.reset();
             for (int j = 0; j < attriListInRead.getCountOfPosition(); j++) {
-                tempVertex.set(attriListInRead.getPosition(i));
+                tempVertex.set(attriListInRead.getPosition(j));
                 if (tempVertex.getReadID() != positionEntry.getReadID()) {
                     tempPosList.append(tempVertex);
                 }
             }
             outputListAndKmer.set(tempPosList, kmer);
-            output.collect(positionEntry, outputListAndKmer);
+//            if(positionEntry.getReadID() == 1){
+                output.collect(positionEntry, outputListAndKmer);
+//            }
         }
     }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
index eb37c46..ec394fe 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -16,34 +16,6 @@
 @SuppressWarnings("deprecation")
 public class DeepGraphBuildingReducer extends MapReduceBase implements
         Reducer<PositionWritable, PositionListAndKmerWritable, NodeWritable, NullWritable> {
-    public class nodeToMergeState {
-        public static final byte NOT_UPDATE = 0;
-        public static final byte ASSIGNED_BY_RIGHTNODE = 1;
-        private byte state;
-
-        public nodeToMergeState() {
-            state = NOT_UPDATE;
-        }
-
-        public void setToNotUpdate() {
-            state = NOT_UPDATE;
-        }
-
-        public void setToAssignedByRightNode() {
-            state = ASSIGNED_BY_RIGHTNODE;
-        }
-
-        public String getState() {
-            switch (state) {
-                case NOT_UPDATE:
-                    return "NOT_UPDATE";
-                case ASSIGNED_BY_RIGHTNODE:
-                    return "ASSIGNED_BY_RIGHTNODE";
-            }
-            return "ERROR_STATE";
-        }
-    }
-
     private PositionListAndKmerWritable curNodePosiListAndKmer = new PositionListAndKmerWritable();
     private PositionListAndKmerWritable curNodeNegaListAndKmer = new PositionListAndKmerWritable();
     private PositionListAndKmerWritable nextNodePosiListAndKmer = new PositionListAndKmerWritable();
@@ -72,6 +44,14 @@
     public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
             OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
         int readID = key.getReadID();
+        if(readID == 1) {
+            int x = 4;
+            int y =x ;
+            System.out.println((int)key.getPosInRead());
+        }
+/*        while(values.hasNext()) {
+            System.out.println(values.next().getKmer().toString());
+        }*/
         byte posInRead = (byte) 1;
         resetNode(curNode, readID, posInRead);
         assembleFirstTwoNodesInRead(curNodePosiListAndKmer, nextNodePosiListAndKmer, nextNodeNegaListAndKmer,
@@ -80,67 +60,75 @@
         assembleNodeFromValues(readID, posInRead, curNodePosiListAndKmer, curNodeNegaListAndKmer,
                 nextNodePosiListAndKmer, nextNodeNegaListAndKmer, incomingList, outgoingList, curNode, nextNode, values);
         posInRead++;
-        while (values.hasNext()) {
-            assembleNodeFromValues(readID, posInRead, curNodePosiListAndKmer, curNodeNegaListAndKmer,
-                    nextNodePosiListAndKmer, nextNodeNegaListAndKmer, incomingList, outgoingList, curNode, nextNextNode, values);
+        boolean flag = true;
+        while (flag) {
+            flag = assembleNodeFromValues(readID, posInRead, curNodePosiListAndKmer, curNodeNegaListAndKmer,
+                    nextNodePosiListAndKmer, nextNodeNegaListAndKmer, incomingList, outgoingList, nextNode,
+                    nextNextNode, values);
             posInRead++;
             if (curNode.inDegree() > 1 || curNode.outDegree() > 0 || nextNode.inDegree() > 0
                     || nextNode.outDegree() > 0 || nextNextNode.inDegree() > 0 || nextNextNode.outDegree() > 0) {
-                connect(curNode, nextNextNode);
+                connect(curNode, nextNode);
                 output.collect(curNode, nullWritable);
                 curNode.set(nextNode);
-                nextNode.set(nextNode);
+                nextNode.set(nextNextNode);
                 continue;
             }
             curNode.mergeForwadNext(nextNode, KMER_SIZE);
             nextNode.set(nextNextNode);
         }
+        output.collect(curNode, nullWritable);
     }
 
-    public void assembleNodeFromValues(int readID, byte posInRead, PositionListAndKmerWritable curNodePosiListAndKmer,
+    public boolean assembleNodeFromValues(int readID, byte posInRead, PositionListAndKmerWritable curNodePosiListAndKmer,
             PositionListAndKmerWritable curNodeNegaListAndKmer, PositionListAndKmerWritable nextNodePosiListAndKmer,
             PositionListAndKmerWritable nextNodeNegaListAndKmer, PositionListWritable outgoingList,
             PositionListWritable incomingList, NodeWritable curNode, NodeWritable nextNode,
             Iterator<PositionListAndKmerWritable> values) throws IOException {
-
+        boolean flag = true;
         curNodePosiListAndKmer.set(nextNodePosiListAndKmer);
         curNodeNegaListAndKmer.set(nextNodeNegaListAndKmer);
         if (values.hasNext()) {
-            nextNodePosiListAndKmer.set(values.next());
+            nextNodeNegaListAndKmer.set(values.next());
             if (values.hasNext()) {
-                nextNodeNegaListAndKmer.set(values.next());
+                nextNodePosiListAndKmer.set(values.next());
             } else {
-                throw new IOException("lose the paired kmer");
+                throw new IOException("lose the paired kmer from values");
+            }
+            outgoingList.reset();
+            outgoingList.set(nextNodePosiListAndKmer.getVertexIDList());
+            setForwardOutgoingList(curNode, outgoingList);
+
+            resetNode(nextNode, readID, posInRead);
+            nextNode.setKmer(nextNodePosiListAndKmer.getKmer());
+
+            outgoingList.reset();
+            outgoingList.set(curNodeNegaListAndKmer.getVertexIDList());
+            setReverseOutgoingList(nextNode, outgoingList);
+
+            if (nextNode.getNodeID().getPosInRead() == LAST_POSID) {
+                incomingList.reset();
+                incomingList.set(nextNodeNegaListAndKmer.getVertexIDList());
+                setReverseIncomingList(nextNode, incomingList);
             }
         }
-
-        outgoingList.reset();
-        outgoingList.set(nextNodePosiListAndKmer.getVertexIDList());
-        setForwardOutgoingList(curNode, outgoingList);
-
-        resetNode(nextNode, readID, posInRead);
-        nextNode.setKmer(nextNodePosiListAndKmer.getKmer());
-
-        outgoingList.reset();
-        outgoingList.set(curNodeNegaListAndKmer.getVertexIDList());
-        setReverseOutgoingList(nextNode, outgoingList);
-
-        if (nextNode.getNodeID().getPosInRead() == LAST_POSID) {
-            incomingList.reset();
-            incomingList.set(nextNodeNegaListAndKmer.getVertexIDList());
-            setReverseIncomingList(nextNode, incomingList);
+        else{
+            flag = false;
+            resetNode(nextNode, readID, (byte)0);
         }
+        return flag;
     }
 
     public void assembleFirstTwoNodesInRead(PositionListAndKmerWritable curNodePosiListAndKmer,
             PositionListAndKmerWritable nextNodePosiListAndKmer, PositionListAndKmerWritable nextNodeNegaListAndKmer,
             PositionListWritable outgoingList, PositionListWritable incomingList, NodeWritable curNode,
             NodeWritable nextNode, Iterator<PositionListAndKmerWritable> values) throws IOException {
-        nextNodePosiListAndKmer.set(values.next());
+        nextNodeNegaListAndKmer.set(values.next());
         if (values.hasNext()) {
-            nextNodeNegaListAndKmer.set(values.next());
+            nextNodePosiListAndKmer.set(values.next());
         } else {
-            throw new IOException("lose the paired kmer");
+            System.out.println(curNode.getNodeID().getReadID());
+            throw new IOException("lose the paired kmer from first two nodes");
         }
 
         if (curNode.getNodeID().getPosInRead() == LAST_POSID) {
@@ -151,7 +139,8 @@
         incomingList.reset();
         incomingList.set(nextNodePosiListAndKmer.getVertexIDList());
 
-        curNode.setKmer(curNodePosiListAndKmer.getKmer());
+        
+        curNode.setKmer(nextNodePosiListAndKmer.getKmer());
         setForwardIncomingList(curNode, incomingList);
     }
 
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
index 4726380..1c68114 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -38,6 +38,9 @@
 
         @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
         public int sizeKmer;
+        
+        @Option(name = "-read-length", usage = "the length of read", required = true)
+        public int readLength;
 
         @Option(name = "-onlytest1stjob", usage = "test", required = true)
         public boolean onlyTest1stJob;
@@ -46,17 +49,17 @@
         public boolean seqOutput;
     }
 
-    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, boolean onlyTest1stJob,
+    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength, boolean onlyTest1stJob,
             boolean seqOutput, String defaultConfPath) throws IOException {
         if (onlyTest1stJob == true) {
-            runfirstjob(inputPath, numReducers, sizeKmer, seqOutput, defaultConfPath);
+            runfirstjob(inputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
         } else {
-            runfirstjob(inputPath, numReducers, sizeKmer, true, defaultConfPath);
-            runsecondjob(inputPath, outputPath, numReducers, sizeKmer, seqOutput, defaultConfPath);
+            runfirstjob(inputPath, 2, sizeKmer, readLength, true, defaultConfPath);
+            runsecondjob(inputPath, outputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
         }
     }
 
-    public void runfirstjob(String inputPath, int numReducers, int sizeKmer, boolean seqOutput, String defaultConfPath)
+    public void runfirstjob(String inputPath, int numReducers, int sizeKmer, int readLength, boolean seqOutput, String defaultConfPath)
             throws IOException {
         JobConf conf = new JobConf(GraphBuildingDriver.class);
         conf.setInt("sizeKmer", sizeKmer);
@@ -90,14 +93,16 @@
         JobClient.runJob(conf);
     }
 
-    public void runsecondjob(String inputPath, String outputPath, int numReducers, int sizeKmer, boolean seqOutput,
+    public void runsecondjob(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength, boolean seqOutput,
             String defaultConfPath) throws IOException {
         JobConf conf = new JobConf(GraphBuildingDriver.class);
         if (defaultConfPath != null) {
             conf.addResource(new Path(defaultConfPath));
         }
         conf.setJobName("deep build");
-
+        conf.setInt("sizeKmer", sizeKmer);
+        conf.setInt("readLength", readLength);
+        
         conf.setMapperClass(DeepGraphBuildingMapper.class);
         conf.setReducerClass(DeepGraphBuildingReducer.class);
 
@@ -106,9 +111,9 @@
 
         conf.setPartitionerClass(ReadIDPartitioner.class);
 
- //       conf.setOutputKeyComparatorClass(PositionWritable.Comparator.class);
- //       conf.setOutputValueGroupingComparator(PositionWritable.FirstComparator.class);
-
+        conf.setOutputKeyComparatorClass(PositionWritable.Comparator.class);
+        conf.setOutputValueGroupingComparator(PositionWritable.FirstComparator.class);
+        
         conf.setInputFormat(SequenceFileInputFormat.class);
         if (seqOutput == true)
             conf.setOutputFormat(SequenceFileOutputFormat.class);
@@ -136,7 +141,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
         GraphBuildingDriver driver = new GraphBuildingDriver();
-        driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer,
+        driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, options.readLength,
                 options.onlyTest1stJob, options.seqOutput, null);
     }
 }
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
index e78e921..a44dcd2 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -37,6 +37,10 @@
         }
         int readID = 0;
         readID = Integer.parseInt(rawLine[0]);
+        if(readID == 6) {
+            int  x = 4;
+            int y = x;
+        }
         String geneLine = rawLine[1];
         byte[] array = geneLine.getBytes();
         if (KMER_SIZE >= array.length) {
@@ -54,10 +58,11 @@
         /** reverse first kmer */
         outputKmer.setByReadReverse(array, 0);
         outputVertexID.set(readID, (byte) -1);
+        output.collect(outputKmer, outputVertexID);
         /** reverse middle kmer */
         for (int i = KMER_SIZE; i < array.length; i++) {
             outputKmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
-            outputVertexID.set(readID, (byte) (-2 + KMER_SIZE - i));
+            outputVertexID.set(readID, (byte)(KMER_SIZE - i - 2));
             output.collect(outputKmer, outputVertexID);
         }
     }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
index d2b6476..beba5ad 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
@@ -19,6 +19,10 @@
     public void reduce(KmerBytesWritable key, Iterator<PositionWritable> values,
             OutputCollector<KmerBytesWritable, PositionListWritable> output, Reporter reporter) throws IOException {
         outputlist.reset();
+        if(key.toString().equals("CTTCT")) {
+            int x = 4;
+            int y = x;
+        }
         while (values.hasNext()) {
             outputlist.append(values.next());
         }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
index 362ebf3..682e8d4 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
@@ -6,10 +6,10 @@
 import edu.uci.ics.genomix.type.PositionListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
-public class ReadIDPartitioner implements Partitioner<PositionWritable, PositionListWritable>{
+public class ReadIDPartitioner implements Partitioner<PositionWritable, PositionListAndKmerWritable>{
     
     @Override
-    public  int getPartition(PositionWritable key, PositionListWritable value, int numPartitions){
+    public  int getPartition(PositionWritable key, PositionListAndKmerWritable value, int numPartitions){
         return (key.getReadID() & Integer.MAX_VALUE) % numPartitions;
     }
 
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
deleted file mode 100644
index d8d4429..0000000
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/TestPathMergeH3.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.hadoop.pmcommon.GenomixMiniClusterTest;
-import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-@SuppressWarnings("deprecation")
-public class TestPathMergeH3 extends GenomixMiniClusterTest {
-    protected String LOCAL_SEQUENCE_FILE = "src/test/resources/data/webmap/text.txt";
-    protected String HDFS_SEQUENCE = "/00-sequence/";
-    protected String HDFS_GRAPHBUILD = "/01-graphbuild/";
-    protected String HDFS_MERGED = "/02-graphmerge/";
-    
-    protected String GRAPHBUILD_FILE = "result.graphbuild.txt";
-    protected String PATHMERGE_FILE = "result.mergepath.txt";
-    
-    {
-        KMER_LENGTH = 5;
-        READ_LENGTH = 8;
-        HDFS_PATHS = new ArrayList<String>(Arrays.asList(HDFS_SEQUENCE, HDFS_GRAPHBUILD, HDFS_MERGED));
-        // we have to specify what kind of keys and values this job has
-        key = new NodeWritable(KMER_LENGTH);
-        value = NullWritable.get();
-    }
-
-    @Test
-    public void TestBuildGraph() throws Exception {
-        cleanUpOutput();
-        copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
-        buildGraph();
-    }
-
-//    @Test
-    public void TestMergeOneIteration() throws Exception {
-        cleanUpOutput();
-        copyLocalToDFS(LOCAL_SEQUENCE_FILE, HDFS_SEQUENCE);
-        buildGraph();
-        MergePathsH3Driver h3 = new MergePathsH3Driver();
-        h3.run(HDFS_GRAPHBUILD, HDFS_MERGED, 2, KMER_LENGTH, 1, ACTUAL_ROOT + "conf.xml", null);
-        copyResultsToLocal(HDFS_MERGED, ACTUAL_ROOT + PATHMERGE_FILE, conf);
-    }
-
-
-
-    public void buildGraph() throws Exception {
-        FileInputFormat.setInputPaths(conf, HDFS_SEQUENCE);
-        FileOutputFormat.setOutputPath(conf, new Path(HDFS_GRAPHBUILD));
-        conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
-        conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
-        driver.runJob(new GenomixJobConf(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
-        copyResultsToLocal(HDFS_GRAPHBUILD, ACTUAL_ROOT + GRAPHBUILD_FILE, conf);
-    }
-}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
index cc24133..871b094 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/NewGraphBuildingTest.java
@@ -29,10 +29,12 @@
     private static final String DATA_PATH = "data/webmap/text.txt";
     private static final String HDFS_PATH = "/webmap";
     private static final String RESULT_PATH = "/result1";
-    private static final String EXPECTED_PATH = "expected/result_after_kmerAggregate";
-    private static final int COUNT_REDUCER = 0;
+    private static final String EXPECTED_PATH = "expected/";
+    private static final int COUNT_REDUCER = 2;
     private static final int SIZE_KMER = 5;
+    private static final int READ_LENGTH = 8;
     private static final String GRAPHVIZ = "Graphviz";
+    private static final String EXPECTED_OUPUT_KMER = EXPECTED_PATH + "result_after_kmerAggregate";
     
     private MiniDFSCluster dfsCluster;
     private MiniMRCluster mrCluster;
@@ -45,8 +47,8 @@
         FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
         startHadoop();
 //        TestGroupbyKmer();
-        TestMapKmerToRead();
-
+//        TestMapKmerToRead();
+        TestGroupByReadID();
 /*        SequenceFile.Reader reader = null;
         Path path = new Path(RESULT_PATH + "/part-00000");
         reader = new SequenceFile.Reader(dfs, path, conf); 
@@ -92,19 +94,21 @@
 
     public void TestGroupbyKmer() throws Exception {
         GraphBuildingDriver tldriver = new GraphBuildingDriver();
-        tldriver.run(HDFS_PATH, RESULT_PATH, COUNT_REDUCER, SIZE_KMER, true, false, HADOOP_CONF_PATH);
+        tldriver.run(HDFS_PATH, RESULT_PATH, COUNT_REDUCER, SIZE_KMER, READ_LENGTH, true, false, HADOOP_CONF_PATH);
         dumpGroupByKmerResult();
-        TestUtils.compareWithResult(new File(ACTUAL_RESULT_DIR + HDFS_PATH + "-step1" + "/part-00000"), new File(EXPECTED_PATH));
+//        TestUtils.compareWithResult(new File(ACTUAL_RESULT_DIR + HDFS_PATH + "-step1" + "/part-00000"), new File(EXPECTED_OUPUT_KMER));
     }
 
     public void TestMapKmerToRead() throws Exception {
         GraphBuildingDriver tldriver = new GraphBuildingDriver();
-        tldriver.run(HDFS_PATH, RESULT_PATH, COUNT_REDUCER, SIZE_KMER, false, false, HADOOP_CONF_PATH);
+        tldriver.run(HDFS_PATH, RESULT_PATH, 0, SIZE_KMER, READ_LENGTH, false, false, HADOOP_CONF_PATH);
         dumpResult();
     }
 
     public void TestGroupByReadID() throws Exception {
-        
+        GraphBuildingDriver tldriver = new GraphBuildingDriver();
+        tldriver.run(HDFS_PATH, RESULT_PATH, 2, SIZE_KMER, READ_LENGTH, false, false, HADOOP_CONF_PATH);
+        dumpResult();
     }
     
     private void startHadoop() throws IOException {