Merge branch 'jianfeng/genomix' of https://code.google.com/p/hyracks into jianfeng/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index df4e755..c0b00a7 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -43,14 +43,15 @@
         incomingList.set(incoming);
     }
 
-    public void setKmer(KmerBytesWritable kmer) {
-        this.kmer = kmer;
-    }
 
     public void setOutgoingList(PositionListWritable outgoing) {
         outgoingList.set(outgoing);
     }
 
+    public void setKmer(KmerBytesWritable right) {
+        this.kmer.set(right);
+    }
+    
     public void reset(int kmerSize) {
         nodeID.set(0, (byte) 0);
         incomingList.reset();
@@ -131,5 +132,12 @@
         sbuilder.append(kmer.toString()).append(')');
         return sbuilder.toString();
     }
+    
+    /*
+     * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1 
+     */
+    public boolean isPathNode() {
+        return incomingList.getCountOfPosition() == 1 && outgoingList.getCountOfPosition() == 1;
+    }
 
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 40817e8..11d0998 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -19,7 +19,8 @@
     protected int offset;
     protected int valueCount;
     protected static final byte[] EMPTY = {};
-
+    public static final int INTBYTES = 4;
+    
     protected PositionWritable posIter = new PositionWritable();
 
     public PositionListWritable() {
@@ -67,6 +68,14 @@
         return posIter;
     }
 
+    public void resetPosition(int i, int readID, byte posInRead) {
+        if (i >= valueCount) {
+            throw new ArrayIndexOutOfBoundsException("No such positions");
+        }
+        Marshal.putInt(readID, storage, offset + i * PositionWritable.LENGTH);
+        storage[offset + INTBYTES] = posInRead;
+    }
+    
     @Override
     public Iterator<PositionWritable> iterator() {
         Iterator<PositionWritable> it = new Iterator<PositionWritable>() {
@@ -120,7 +129,7 @@
         storage[offset + valueCount * PositionWritable.LENGTH + PositionWritable.INTBYTES] = posInRead;
         valueCount += 1;
     }
-
+    
     public static int getCountByDataLength(int length) {
         if (length % PositionWritable.LENGTH != 0) {
             throw new IllegalArgumentException("Length of positionlist is invalid");
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 132b464..f77c844 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -126,6 +126,19 @@
         }
     }
 
+    public static class FirstComparator extends WritableComparator {
+        public FirstComparator() {
+            super(PositionWritable.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            int thisValue = Marshal.getInt(b1, s1);
+            int thatValue = Marshal.getInt(b2, s2);
+            int diff = thisValue - thatValue;
+            return diff;
+        }
+    }
+    
     static { // register this comparator
         WritableComparator.define(PositionWritable.class, new Comparator());
     }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
new file mode 100644
index 0000000..ddb2a64
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -0,0 +1,246 @@
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH3 extends Configured implements Tool {
+    /*
+     * Flags used when sending messages
+     */
+    public static class MessageFlag {
+        public static final byte EMPTY_MESSAGE = 0;
+        public static final byte FROM_SELF = 1;
+        public static final byte FROM_SUCCESSOR = 1 << 1;
+        public static final byte IS_HEAD = 1 << 2;
+        public static final byte FROM_PREDECESSOR = 1 << 3;
+
+        public static String getFlagAsString(byte code) {
+            // TODO: allow multiple flags to be set
+            switch (code) {
+                case EMPTY_MESSAGE:
+                    return "EMPTY_MESSAGE";
+                case FROM_SELF:
+                    return "FROM_SELF";
+                case FROM_SUCCESSOR:
+                    return "FROM_SUCCESSOR";
+            }
+            return "ERROR_BAD_MESSAGE";
+        }
+    }
+
+    /*
+     * Common functionality for the two mapper types needed.  See javadoc for MergePathsH3MapperSubsequent.
+     */
+    private static class MergePathsH3MapperBase extends MapReduceBase {
+
+        protected static long randSeed;
+        protected Random randGenerator;
+        protected float probBeingRandomHead;
+
+        protected int KMER_SIZE;
+        protected PositionWritable outputKey;
+        protected MessageWritableNodeWithFlag outputValue;
+        protected NodeWritable curNode;
+
+        public void configure(JobConf conf) {
+            randSeed = conf.getLong("randomSeed", 0);
+            randGenerator = new Random(randSeed);
+            probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
+
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            outputKey = new PositionWritable();
+            curNode = new NodeWritable(KMER_SIZE);
+        }
+
+        protected boolean isNodeRandomHead(PositionWritable nodeID) {
+            // "deterministically random", based on node id
+            randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+            return randGenerator.nextFloat() < probBeingRandomHead;
+        }
+    }
+
+    /*
+     * Mapper class: Partition the graph using random pseudoheads.
+     * Heads send themselves to their successors, and all others map themselves.
+     */
+    private static class MergePathsH3MapperSubsequent extends MergePathsH3MapperBase implements
+            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+        @Override
+        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
+                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+                throws IOException {
+            curNode = value.getNode();
+            // Map all path vertices; tail nodes are sent to their predecessors
+            if (curNode.isPathNode()) {
+                boolean isHead = (value.getFlag() & MessageFlag.IS_HEAD) == MessageFlag.IS_HEAD;
+                if (isHead || isNodeRandomHead(curNode.getNodeID())) {
+                    // head nodes send themselves to their successor
+                    outputKey.set(curNode.getOutgoingList().getPosition(0));
+                    outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
+                    output.collect(outputKey, outputValue);
+                } else {
+                    // tail nodes map themselves
+                    outputValue.set(MessageFlag.FROM_SELF, curNode);
+                    output.collect(key, outputValue);
+                }
+            }
+        }
+    }
+
+    /*
+     * Mapper used for the first iteration.  See javadoc for MergePathsH3MapperSubsequent.
+     */
+    private static class MergePathsH3MapperInitial extends MergePathsH3MapperBase implements
+            Mapper<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag> {
+        @Override
+        public void map(NodeWritable key, NullWritable value,
+                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+                throws IOException {
+            curNode = key;
+            // Map all path vertices; tail nodes are sent to their predecessors
+            if (curNode.isPathNode()) {
+                if (isNodeRandomHead(curNode.getNodeID())) {
+                    // head nodes send themselves to their successor
+                    outputKey.set(curNode.getOutgoingList().getPosition(0));
+                    outputValue.set((byte) (MessageFlag.FROM_PREDECESSOR | MessageFlag.IS_HEAD), curNode);
+                    output.collect(outputKey, outputValue);
+                } else {
+                    // tail nodes map themselves
+                    outputValue.set(MessageFlag.FROM_SELF, curNode);
+                    output.collect(key.getNodeID(), outputValue);
+                }
+            }
+        }
+    }
+
+    /*
+     * Reducer class: merge nodes that co-occur; for singletons, remap the original nodes 
+     */
+    private static class MergePathsH3Reducer extends MapReduceBase implements
+            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+
+        private int KMER_SIZE;
+        private MessageWritableNodeWithFlag inputValue;
+        private MessageWritableNodeWithFlag outputValue;
+        private NodeWritable headNode;
+        private NodeWritable tailNode;
+        private int count;
+
+        public void configure(JobConf conf) {
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            headNode = new NodeWritable(KMER_SIZE);
+            tailNode = new NodeWritable(KMER_SIZE);
+        }
+
+        @Override
+        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
+                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+                throws IOException {
+
+            inputValue = values.next();
+            if (!values.hasNext()) {
+                // all single nodes must be remapped
+                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+                    // FROM_SELF => remap self
+                    output.collect(key, inputValue);
+                } else {
+                    // FROM_PREDECESSOR => remap predecessor
+                    output.collect(inputValue.getNode().getNodeID(), inputValue);
+                }
+            } else {
+                // multiple inputs => a merge will take place. Aggregate both, then collect the merged path
+                count = 0;
+                while (true) { // process values; break when no more
+                    count++;
+                    if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+                        headNode.set(inputValue.getNode());
+                    } else {
+                        tailNode.set(inputValue.getNode());
+                    }
+                    if (!values.hasNext()) {
+                        break;
+                    } else {
+                        inputValue = values.next();
+                    }
+                }
+                if (count != 2) {
+                    throw new IOException("Expected two nodes in MergePathsH3 reduce; saw " + String.valueOf(count));
+                }
+                // merge the head and tail as saved output, this merged node is now a head 
+                headNode.mergeNext(tailNode, KMER_SIZE);
+                outputValue.set(MessageFlag.IS_HEAD, headNode);
+                output.collect(key, outputValue);
+            }
+        }
+    }
+
+    /*
+     * Run one iteration of the mergePaths algorithm
+     */
+    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+        JobConf conf = new JobConf(baseConf);
+        conf.setJarByClass(MergePathsH3.class);
+        conf.setJobName("MergePathsH3 " + inputPath);
+
+        FileInputFormat.addInputPath(conf, new Path(inputPath));
+        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+        conf.setInputFormat(SequenceFileInputFormat.class);
+
+        conf.setMapOutputKeyClass(PositionWritable.class);
+        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputKeyClass(PositionWritable.class);
+        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+
+        // on the first iteration, we have to transform from a node-oriented graph 
+        // to a Position-oriented graph
+        if (conf.getInt("iMerge", 1) == 1) {
+            conf.setMapperClass(MergePathsH3MapperInitial.class);
+        } else {
+            conf.setMapperClass(MergePathsH3MapperSubsequent.class);
+        }
+        conf.setReducerClass(MergePathsH3Reducer.class);
+
+        FileSystem.get(conf).delete(new Path(outputPath), true);
+
+        return JobClient.runJob(conf);
+    }
+
+    @Override
+    public int run(String[] arg0) throws Exception {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public static void main(String[] args) throws Exception {
+        int res = ToolRunner.run(new Configuration(), new MergePathsH3(), args);
+        System.exit(res);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
new file mode 100644
index 0000000..4982439
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3Driver.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+@SuppressWarnings("deprecation")
+public class MergePathsH3Driver {
+
+    private static class Options {
+        @Option(name = "-inputpath", usage = "the input path", required = true)
+        public String inputPath;
+
+        @Option(name = "-outputpath", usage = "the output path", required = true)
+        public String outputPath;
+
+        @Option(name = "-mergeresultpath", usage = "the merging results path", required = true)
+        public String mergeResultPath;
+
+        @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+        public int numReducers;
+
+        @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+        public int sizeKmer;
+
+        @Option(name = "-merge-rounds", usage = "the while rounds of merging", required = true)
+        public int mergeRound;
+
+    }
+
+    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int mergeRound)
+            throws IOException {
+        JobConf baseConf = new JobConf(); // I don't know the semantics here.  do i use a base conf file or something?
+        baseConf.setNumReduceTasks(numReducers);
+        baseConf.setInt("sizeKmer", sizeKmer);
+
+        FileSystem dfs = FileSystem.get(baseConf);
+        String prevOutput = inputPath;
+        dfs.delete(new Path(outputPath), true); // clear any previous output
+
+        String tmpOutputPath = "NO_JOBS_DONE";
+        for (int iMerge = 1; iMerge <= mergeRound; iMerge++) {
+            baseConf.setInt("iMerge", iMerge);
+            MergePathsH3 merger = new MergePathsH3();
+            tmpOutputPath = inputPath + ".mergepathsH3." + String.valueOf(iMerge);
+            merger.run(prevOutput, tmpOutputPath, baseConf);
+        }
+        dfs.rename(new Path(tmpOutputPath), new Path(outputPath)); // save final results
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+        MergePathsH3Driver driver = new MergePathsH3Driver();
+        driver.run(options.inputPath, options.outputPath, options.numReducers, 
+                options.sizeKmer, options.mergeRound);
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
new file mode 100644
index 0000000..5a3076c
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.type.NodeWritable;
+
+public class MessageWritableNodeWithFlag extends BinaryComparable implements WritableComparable<BinaryComparable> {
+    private byte flag;
+    private NodeWritable node;
+
+    public MessageWritableNodeWithFlag(int k) {
+        this.flag = 0;
+        this.node = new NodeWritable(k);
+    }
+
+    public MessageWritableNodeWithFlag(byte flag, int kmerSize) {
+        this.flag = flag;
+        this.node = new NodeWritable(kmerSize);
+    }
+
+    public void set(MessageWritableNodeWithFlag right) {
+        set(right.getFlag(), right.getNode());
+    }
+
+    public void set(byte flag, NodeWritable node) {
+        this.node.set(node);
+        this.flag = flag;
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+        node.readFields(arg0);
+        flag = arg0.readByte();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+        node.write(arg0);
+        arg0.writeByte(flag);
+    }
+
+    public NodeWritable getNode() {
+        if (node.getCount() != 0) {
+            return node;
+        }
+        return null;
+    }
+
+    public byte getFlag() {
+        return this.flag;
+    }
+
+    public String toString() {
+        return node.toString() + '\t' + String.valueOf(flag);
+    }
+
+    @Override
+    public byte[] getBytes() {
+        if (node.getCount() != 0) {
+            return node.getKmer().getBytes();
+        } else
+            return null;
+    }
+
+    @Override
+    public int getLength() {
+        return node.getCount();
+    }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
deleted file mode 100644
index 591e3c7..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
-
-import java.io.IOException;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.PositionListWritable;
-
-@SuppressWarnings("deprecation")
-public class DeepGraphBuildingMapper extends MapReduceBase implements
-        Mapper<KmerBytesWritable, PositionListWritable, IntWritable, LineBasedmappingWritable> {
-    IntWritable  numLine = new IntWritable();
-    LineBasedmappingWritable lineBasedWriter = new LineBasedmappingWritable();
-    @Override
-    public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<IntWritable, LineBasedmappingWritable> output,
-            Reporter reporter) throws IOException {
-    }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
deleted file mode 100644
index 4b971a7..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
-
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.NodeWritable;
-
-@SuppressWarnings("deprecation")
-public class DeepGraphBuildingReducer extends MapReduceBase implements
-        Reducer<IntWritable, LineBasedmappingWritable, NodeWritable, NullWritable> {
-
-/*    public ArrayList<LineBasedmappingWritable> lineElementsSet = new ArrayList<LineBasedmappingWritable>();
-    public Position outputVerID = new Position();
-    public VertexAdjacentWritable outputAdjacentList = new VertexAdjacentWritable();
-    public PositionList srcVtexAdjList = new PositionList();
-    public PositionList desVtexAdjList = new PositionList();
-    public VertexIDListWritable srcAdjListWritable = new VertexIDListWritable();
-    public VKmerBytesWritable desKmer = new VKmerBytesWritable(1);
-    public VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
-    public VKmerBytesWritable srcKmer = new VKmerBytesWritable(1);*/
-    @Override
-    public void reduce(IntWritable key, Iterator<LineBasedmappingWritable> values,
-            OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
-/*        while (values.hasNext()) {
-            lineElementsSet.add(values.next());
-        }
-        int[] orderLineTable = new int[lineElementsSet.size()];
-        for (int i = 0; i < lineElementsSet.size(); i++) {
-            int posInInvertedIndex = lineElementsSet.get(i).getPosInInvertedIndex();
-            orderLineTable[lineElementsSet.get(i).getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex)] = i;
-        }
-        //the first node in this read
-        int posInInvertedIndex = lineElementsSet.get(orderLineTable[0]).getPosInInvertedIndex();
-        outputVerID.set(
-                lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
-                (byte) 0);
-        desVtexAdjList.set(lineElementsSet.get(orderLineTable[1]).getAdjVertexList().get());
-        for (int i = 0; i < desVtexAdjList.getUsedSize(); i++) {
-            if (desVtexAdjList.getPosinReadListElement(i) == (byte) 0) {
-                srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
-            }
-        }
-        srcVtexAdjList.addELementToList(key.get(), (byte) 1);
-        outputVerID.set(
-                lineElementsSet.get(orderLineTable[0]).getAdjVertexList().get().getReadListElement(posInInvertedIndex),
-                (byte) 0);
-        srcAdjListWritable.set(srcVtexAdjList);
-        outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[0]).getVkmer());
-        output.collect(outputVerID, outputAdjacentList);
-        //srcVtexAdjList reset!!!!
-
-        for (int i = 1; i < lineElementsSet.size(); i++) {
-            desVtexAdjList.set(lineElementsSet.get(orderLineTable[i + 1]).getAdjVertexList().get());
-            boolean flag = false;
-            for (int j = 0; j < desVtexAdjList.getUsedSize(); j++) {
-                if (desVtexAdjList.getPosinReadListElement(j) == (byte) 0) {
-                    srcVtexAdjList.addELementToList(desVtexAdjList.getReadListElement(i), (byte) 0);
-                    flag = true;
-                }
-            }
-            if (flag = true) {
-                //doesm't merge
-                srcVtexAdjList.addELementToList(key.get(), (byte) (i + 1));
-                outputVerID.set(
-                        lineElementsSet.get(orderLineTable[i]).getAdjVertexList().get()
-                                .getReadListElement(posInInvertedIndex), lineElementsSet.get(orderLineTable[i])
-                                .getAdjVertexList().get().getPosinReadListElement(posInInvertedIndex));
-                srcAdjListWritable.set(srcVtexAdjList);
-                outputAdjacentList.set(srcAdjListWritable, lineElementsSet.get(orderLineTable[i]).getVkmer());
-            }
-            else {
-                //merge
-                desKmer.set(kmerFactory.getFirstKmerFromChain(1, lineElementsSet.get(orderLineTable[i+1]).getVkmer()));
-                srcKmer.set(lineElementsSet.get(orderLineTable[i]).getVkmer());
-                lineElementsSet.get(orderLineTable[i+1]).getVkmer().set(kmerFactory.mergeTwoKmer(srcKmer, desKmer));
-                orderLineTable[i+1] = orderLineTable[i];
-            }
-        }*/
-    }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
deleted file mode 100644
index 1e44903..0000000
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/LineBasedmappingWritable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import edu.uci.ics.genomix.type.PositionListWritable;
-
-public class LineBasedmappingWritable extends PositionListWritable{
-    byte posInRead;
-    
-    public LineBasedmappingWritable() {
-        super();
-        this.posInRead = -1;        
-    }
-
-    public LineBasedmappingWritable(int count, byte [] data, int offset, byte posInRead) {       
-        super(count, data, offset);
-        this.posInRead = posInRead;
-    }
-    
-    public void set(byte posInRead, PositionListWritable right) {
-        super.set(right);
-        this.posInRead = posInRead;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        this.posInRead = in.readByte();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        super.write(out);
-        out.writeByte(this.posInRead);
-    }
-    
-    public int getPosInInvertedIndex() {
-        return this.posInRead;
-    }
-}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
new file mode 100644
index 0000000..c3c252a
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingMapper.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingMapper extends MapReduceBase implements
+        Mapper<KmerBytesWritable, PositionListWritable, PositionWritable, PositionListAndKmerWritable> {
+    
+    public PositionWritable VertexID;
+    public PositionWritable tempVertex;
+    public PositionListWritable listPosZeroInRead;
+    public PositionListWritable listPosNonZeroInRead;
+    public PositionListWritable tempPosList;
+    public PositionListAndKmerWritable outputListAndKmer;
+    @Override
+    public void configure(JobConf job) {
+        VertexID = new PositionWritable();
+        tempVertex = new PositionWritable();
+        listPosZeroInRead = new PositionListWritable();
+        listPosNonZeroInRead = new PositionListWritable();
+        tempPosList = new PositionListWritable();
+        outputListAndKmer = new PositionListAndKmerWritable();
+    }
+    @Override
+    public void map(KmerBytesWritable key, PositionListWritable value, OutputCollector<PositionWritable, PositionListAndKmerWritable> output,
+            Reporter reporter) throws IOException {
+        listPosZeroInRead.reset();
+        listPosNonZeroInRead.reset();
+        outputListAndKmer.reset();
+        for(int i = 0; i < value.getLength(); i++) {
+            VertexID.set(value.getPosition(i));
+            if(VertexID.getPosInRead() == 0) {
+                listPosZeroInRead.append(VertexID);
+            }
+            else {
+                listPosNonZeroInRead.append(VertexID);
+            }
+        }
+        for(int i = 0; i < listPosZeroInRead.getCountOfPosition(); i++) {
+            VertexID.set(listPosZeroInRead.getPosition(i));
+            tempPosList.reset();
+            for (int j = 0; j < listPosNonZeroInRead.getCountOfPosition(); j++) {
+                tempVertex.set(listPosNonZeroInRead.getPosition(i));
+                if(tempVertex.getReadID() != VertexID.getReadID()) {
+                    int tempReadID = tempVertex.getReadID();
+                    byte tempPosInRead = (byte) (tempVertex.getPosInRead() - 1);
+                    tempVertex.set(tempReadID, tempPosInRead);
+                    tempPosList.append(tempVertex);
+                }
+            }
+            outputListAndKmer.set(tempPosList, key);
+            output.collect(VertexID, outputListAndKmer);
+        }
+        for(int i = 0; i < listPosNonZeroInRead.getCountOfPosition(); i++) {
+            VertexID.set(listPosNonZeroInRead.getPosition(i));
+            tempPosList.reset();
+            for (int j = 0; j < listPosZeroInRead.getCountOfPosition(); j++) {
+                tempVertex.set(listPosNonZeroInRead.getPosition(i));
+                if(tempVertex.getReadID() != VertexID.getReadID()) {
+                    tempPosList.append(tempVertex);
+                }
+            }
+            outputListAndKmer.set(tempPosList, key);
+            output.collect(VertexID, outputListAndKmer);
+        }
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
new file mode 100644
index 0000000..ee12661
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/DeepGraphBuildingReducer.java
@@ -0,0 +1,139 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class DeepGraphBuildingReducer extends MapReduceBase implements
+        Reducer<PositionWritable, PositionListAndKmerWritable, NodeWritable, NullWritable> {
+    public class nodeToMergeState {
+        public static final byte NOT_UPDATE = 0;
+        public static final byte ASSIGNED_BY_RIGHTNODE = 1;
+        private byte state;
+
+        public nodeToMergeState() {
+            state = NOT_UPDATE;
+        }
+
+        public void setToNotUpdate() {
+            state = NOT_UPDATE;
+        }
+
+        public void setToAssignedByRightNode() {
+            state = ASSIGNED_BY_RIGHTNODE;
+        }
+
+        public String getState() {
+            switch (state) {
+                case NOT_UPDATE:
+                    return "NOT_UPDATE";
+                case ASSIGNED_BY_RIGHTNODE:
+                    return "ASSIGNED_BY_RIGHTNODE";
+            }
+            return "ERROR_STATE";
+        }
+    }
+
+    private PositionListAndKmerWritable nodeListAndKmer = new PositionListAndKmerWritable();
+    private PositionListAndKmerWritable nodeSuccListAndKmer = new PositionListAndKmerWritable();
+    private NodeWritable startNodeInRead = new NodeWritable();
+    private NodeWritable nodeToMerge = new NodeWritable();
+    private NodeWritable nodeToBeMerged = new NodeWritable();
+    private PositionListWritable incomingList = new PositionListWritable();
+    private PositionListWritable outgoingList = new PositionListWritable();
+    private NullWritable nullWritable = NullWritable.get();
+    private nodeToMergeState state = new nodeToMergeState();
+    private int KMER_SIZE;
+    public void configure(JobConf job) {
+        KMER_SIZE = job.getInt("sizeKmer", 0);
+    }
+    @Override
+    public void reduce(PositionWritable key, Iterator<PositionListAndKmerWritable> values,
+            OutputCollector<NodeWritable, NullWritable> output, Reporter reporter) throws IOException {
+        int readID = key.getReadID();
+        byte posInRead = 0;
+        assembleNodeInitialization(readID, posInRead, values);
+        posInRead = (byte) (posInRead + 2);
+        //----LOOP
+        while (values.hasNext()) {
+            assembleNodeFromValues(readID, posInRead, values);
+            posInRead = (byte) (posInRead + 1);
+            if (nodeToMerge.isPathNode() == true && nodeToBeMerged.isPathNode() == true) {
+                nodeToMerge.mergeNext(nodeToBeMerged, KMER_SIZE);
+                state.setToNotUpdate();
+            }
+            else {
+                state.setToAssignedByRightNode();
+                output.collect(nodeToMerge, nullWritable);
+            }
+        }
+    }
+
+    public void assembleNodeFromValues(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+            throws IOException {
+        if (values.hasNext()) {
+            nodeListAndKmer.set(values.next());
+        } else {
+            throw new IOException("the size of values emerge bug!");
+        }
+        if (state.getState().equals("ASSIGNED_BY_RIGHTNODE")) {
+            nodeToMerge.set(nodeToBeMerged);
+        }
+        incomingList.reset();
+        incomingList.append(readID, (byte) (posInRead - 1));
+        if (nodeSuccListAndKmer.getVertexIDList() != null)
+            outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+        outgoingList.append(readID, (byte) (posInRead + 1));
+        nodeToBeMerged.setNodeID(readID, (byte) posInRead);
+        nodeToBeMerged.setIncomingList(incomingList);
+        nodeToBeMerged.setOutgoingList(outgoingList);
+        nodeToBeMerged.setKmer(nodeListAndKmer.getKmer());
+    }
+
+    public void assembleNodeInitialization(int readID, byte posInRead, Iterator<PositionListAndKmerWritable> values)
+            throws IOException {
+        if (values.hasNext()) {
+            nodeListAndKmer.set(values.next());
+        } else {
+            throw new IOException("the size of values emerge bug!");
+        }
+        if (values.hasNext()) {
+            nodeSuccListAndKmer.set(values.next());
+        }
+        incomingList.reset();
+        incomingList.set(nodeSuccListAndKmer.getVertexIDList());
+        outgoingList.reset();
+        if (nodeSuccListAndKmer.getVertexIDList() != null)
+            outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+        outgoingList.append(readID, (byte) (posInRead + 1));
+        startNodeInRead.setNodeID(readID, posInRead);
+        startNodeInRead.setIncomingList(incomingList);
+        startNodeInRead.setOutgoingList(outgoingList);
+        startNodeInRead.setKmer(nodeListAndKmer.getKmer());
+        //---------
+        nodeListAndKmer.set(nodeSuccListAndKmer);
+        incomingList.reset();
+        incomingList.append(readID, posInRead);
+        if (values.hasNext()) {
+            nodeSuccListAndKmer.set(values.next());
+            if (nodeSuccListAndKmer.getVertexIDList() != null)
+                outgoingList.set(nodeSuccListAndKmer.getVertexIDList());
+        }
+        outgoingList.append(readID, (byte) (posInRead + 2));
+        nodeToMerge.setNodeID(readID, (byte) posInRead);
+        nodeToMerge.setIncomingList(incomingList);
+        nodeToMerge.setOutgoingList(outgoingList);
+        nodeToMerge.setKmer(nodeListAndKmer.getKmer());
+        state.setToNotUpdate();
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
new file mode 100644
index 0000000..acbc3f1
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -0,0 +1,112 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GraphBuildingDriver {
+
+    private static class Options {
+        @Option(name = "-inputpath", usage = "the input path", required = true)
+        public String inputPath;
+
+        @Option(name = "-outputpath", usage = "the output path", required = true)
+        public String outputPath;
+
+        @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+        public int numReducers;
+
+        @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+        public int sizeKmer;
+
+    }
+   
+    public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, String defaultConfPath)
+            throws IOException {
+
+        JobConf conf = new JobConf(GraphBuildingDriver.class);
+        conf.setInt("sizeKmer", sizeKmer);
+        if (defaultConfPath != null) {
+            conf.addResource(new Path(defaultConfPath));
+        }
+
+        conf.setJobName("graph building");
+        conf.setMapperClass(GraphInvertedIndexBuildingMapper.class);
+        conf.setReducerClass(GraphInvertedIndexBuildingReducer.class);
+
+        conf.setMapOutputKeyClass(KmerBytesWritable.class);
+        conf.setMapOutputValueClass(PositionWritable.class);
+
+        conf.setInputFormat(TextInputFormat.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+        
+        conf.setOutputKeyClass(KmerBytesWritable.class);
+        conf.setOutputValueClass(PositionListWritable.class);
+        
+        FileInputFormat.setInputPaths(conf, new Path(inputPath));
+        FileOutputFormat.setOutputPath(conf, new Path(inputPath + "-step1"));
+        conf.setNumReduceTasks(numReducers);
+
+        FileSystem dfs = FileSystem.get(conf);
+        dfs.delete(new Path(inputPath + "-step1"), true);
+        JobClient.runJob(conf);
+        
+        //-------------
+        conf = new JobConf(GraphBuildingDriver.class);
+        if (defaultConfPath != null) {
+            conf.addResource(new Path(defaultConfPath));
+        }
+        conf.setJobName("deep build");
+        
+        conf.setMapperClass(DeepGraphBuildingMapper.class);
+        conf.setReducerClass(DeepGraphBuildingReducer.class);
+        
+        conf.setMapOutputKeyClass(PositionWritable.class);
+        conf.setMapOutputValueClass(PositionListAndKmerWritable.class);
+        
+        conf.setPartitionerClass(ReadIDPartitioner.class);
+        
+        conf.setOutputKeyComparatorClass(PositionWritable.Comparator.class);
+        conf.setOutputValueGroupingComparator(PositionWritable.FirstComparator.class);
+        
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setOutputFormat(TextOutputFormat.class);
+        
+        conf.setOutputKeyClass(NodeWritable.class);
+        conf.setOutputValueClass(NullWritable.class);
+        
+        FileInputFormat.setInputPaths(conf, new Path(inputPath + "-step1"));
+        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+        conf.setNumReduceTasks(1);
+        dfs.delete(new Path(outputPath), true);
+        JobClient.runJob(conf);
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+        GraphBuildingDriver driver = new GraphBuildingDriver();
+        driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer, null);
+    }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
similarity index 83%
rename from genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
rename to genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
index 7b65158..d5924c8 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
 
 import java.io.IOException;
 import org.apache.hadoop.io.LongWritable;
@@ -33,17 +33,16 @@
         /** first kmer */
         byte[] array = geneLine.getBytes();
         outputKmer.setByRead(array, 0);
+        System.out.println(key.get());
         outputVertexID.set((int)key.get(), (byte)0);
         output.collect(outputKmer, outputVertexID);
         /** middle kmer */
-        for (int i = KMER_SIZE; i < array.length - 1; i++) {
+        int i = 0; 
+        for (i = KMER_SIZE; i < array.length; i++) {
             outputKmer.shiftKmerWithNextChar(array[i]);
+            System.out.println((int)key.get());
             outputVertexID.set((int)key.get(), (byte)(i - KMER_SIZE + 1));
             output.collect(outputKmer, outputVertexID);
         }
-        /** last kmer */
-        outputKmer.shiftKmerWithNextChar(array[array.length - 1]);
-        outputVertexID.set((int)key.get(), (byte)(array.length - 1 + 1));
-        output.collect(outputKmer, outputVertexID);
     }
 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
similarity index 92%
rename from genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
rename to genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
index 72827f2..d2b6476 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphInvertedIndexBuildingReducer.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.hadoop.valvetgraphbuilding;
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -18,6 +18,7 @@
     @Override
     public void reduce(KmerBytesWritable key, Iterator<PositionWritable> values,
             OutputCollector<KmerBytesWritable, PositionListWritable> output, Reporter reporter) throws IOException {
+        outputlist.reset();
         while (values.hasNext()) {
             outputlist.append(values.next());
         }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/PositionListAndKmerWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/PositionListAndKmerWritable.java
new file mode 100644
index 0000000..fff3faf
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/PositionListAndKmerWritable.java
@@ -0,0 +1,81 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class PositionListAndKmerWritable implements WritableComparable<PositionListAndKmerWritable> {
+    
+    private PositionListWritable vertexIDList;
+    private KmerBytesWritable kmer;
+    private int countOfKmer;
+    
+    public PositionListAndKmerWritable(){
+        countOfKmer = 0;
+        vertexIDList = new PositionListWritable();
+        kmer = new KmerBytesWritable();
+    }
+
+    public PositionListAndKmerWritable(int kmerSize) {
+        countOfKmer = 0;
+        vertexIDList = new PositionListWritable();
+        kmer = new KmerBytesWritable(kmerSize);
+    }
+
+    public int getCount() {
+        return countOfKmer;
+    }
+
+    public void setCount(int count) {
+        this.countOfKmer = count;
+    }
+
+    public void setvertexIDList(PositionListWritable posList) {
+        vertexIDList.set(posList);
+    }
+
+    public void reset() {
+        vertexIDList.reset();
+        countOfKmer = 0;
+    }
+
+    public PositionListWritable getVertexIDList() {
+        return vertexIDList;
+    }
+
+    public KmerBytesWritable getKmer() {
+        return kmer;
+    }
+
+    public void set(PositionListAndKmerWritable right) {
+        this.countOfKmer = right.countOfKmer;
+        this.vertexIDList.set(right.vertexIDList);
+        this.kmer.set(right.kmer);
+    }
+
+    public void set(PositionListWritable list, KmerBytesWritable kmer) {
+        this.vertexIDList.set(list);
+        this.kmer.set(kmer);
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.countOfKmer = in.readInt();
+        this.vertexIDList.readFields(in);
+        this.kmer.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(this.countOfKmer);
+        this.vertexIDList.write(out);
+        this.kmer.write(out);
+    }
+
+    @Override
+    public int compareTo(PositionListAndKmerWritable o) {
+        return 0;
+    }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
new file mode 100644
index 0000000..362ebf3
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/ReadIDPartitioner.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.genomix.hadoop.velvetgraphbuilding;
+
+//import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class ReadIDPartitioner implements Partitioner<PositionWritable, PositionListWritable>{
+    
+    @Override
+    public  int getPartition(PositionWritable key, PositionListWritable value, int numPartitions){
+        return (key.getReadID() & Integer.MAX_VALUE) % numPartitions;
+    }
+
+    @Override
+    public void configure(JobConf arg0) {
+    }
+}