Merge branch 'genomix/fullstack_genomix' of https://code.google.com/p/hyracks into jianfeng/genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 1e0c23e..50baeb4 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -50,6 +50,10 @@
     public KmerBytesWritable(int k, byte[] storage, int offset) {
         setNewReference(k, storage, offset);
     }
+    
+    public KmerBytesWritable(int k, String kmer) {
+        setNewReference(kmer.length(), kmer.getBytes(), 0);
+    }
 
     /**
      * Initial Kmer space by kmerlength
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index ca47c10..d4dab00 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -39,7 +39,7 @@
     }
 
     public NodeWritable(int kmerSize) {
-        nodeID = new PositionWritable(0,(byte) 0);
+        nodeID = new PositionWritable(0, (byte) 0);
         forwardForwardList = new PositionListWritable();
         forwardReverseList = new PositionListWritable();
         reverseForwardList = new PositionListWritable();
@@ -47,6 +47,17 @@
         kmer = new KmerBytesWritable(kmerSize);
     }
 
+    public NodeWritable(PositionWritable nodeID, PositionListWritable FFList, PositionListWritable FRList,
+            PositionListWritable RFList, PositionListWritable RRList, KmerBytesWritable kmer) {
+        this(kmer.getKmerLength());
+        this.nodeID.set(nodeID);
+        forwardForwardList.set(FFList);
+        forwardReverseList.set(FRList);
+        reverseForwardList.set(RFList);
+        reverseReverseList.set(RRList);
+        kmer.set(kmer);
+    }
+
     public void setNodeID(PositionWritable ref) {
         this.setNodeID(ref.getReadID(), ref.getPosInRead());
     }
@@ -58,7 +69,7 @@
     public void setKmer(KmerBytesWritable right) {
         this.kmer.set(right);
     }
-    
+
     public void reset(int kmerSize) {
         nodeID.set(0, (byte) 0);
         forwardForwardList.reset();
@@ -71,7 +82,7 @@
     public PositionListWritable getFFList() {
         return forwardForwardList;
     }
-    
+
     public PositionListWritable getFRList() {
         return forwardReverseList;
     }
@@ -79,7 +90,7 @@
     public PositionListWritable getRFList() {
         return reverseForwardList;
     }
-    
+
     public PositionListWritable getRRList() {
         return reverseReverseList;
     }
@@ -101,8 +112,8 @@
         this.forwardReverseList.set(nextNode.forwardReverseList);
         kmer.mergeNextKmer(initialKmerSize, nextNode.getKmer());
     }
-    
-    public void mergeForwardPre(NodeWritable preNode, int initialKmerSize){
+
+    public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
         this.reverseForwardList.set(preNode.reverseForwardList);
         this.reverseReverseList.set(preNode.reverseReverseList);
         kmer.mergePreKmer(initialKmerSize, preNode.getKmer());
@@ -148,6 +159,18 @@
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (o instanceof NodeWritable) {
+            NodeWritable nw = (NodeWritable) o;
+            return (this.nodeID.equals(nw.nodeID) && this.forwardForwardList.equals(nw.forwardForwardList)
+                    && this.forwardReverseList.equals(nw.forwardReverseList)
+                    && this.reverseForwardList.equals(nw.reverseForwardList)
+                    && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
+        }
+        return false;
+    }
+
+    @Override
     public String toString() {
         StringBuilder sbuilder = new StringBuilder();
         sbuilder.append('(');
@@ -159,15 +182,15 @@
         sbuilder.append(kmer.toString()).append(')');
         return sbuilder.toString();
     }
-    
-    public int inDegree(){
+
+    public int inDegree() {
         return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
     }
-    
-    public int outDegree(){
+
+    public int outDegree() {
         return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
     }
-    
+
     /*
      * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1 
      */
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index 1b1b392..eca4a28 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -19,7 +19,9 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 
@@ -47,6 +49,13 @@
     public PositionListWritable(int count, byte[] data, int offset) {
         setNewReference(count, data, offset);
     }
+    
+    public PositionListWritable(List<PositionWritable> posns) {
+        this();
+        for (PositionWritable p : posns) {
+            append(p);
+        }
+    }
 
     public void setNewReference(int count, byte[] data, int offset) {
         this.valueCount = count;
@@ -198,4 +207,23 @@
         }
         return sbuilder.toString();
     }
+    
+    @Override
+    public int hashCode() {
+        return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof PositionListWritable))
+            return false;
+        PositionListWritable other = (PositionListWritable) o;
+        if (this.valueCount != other.valueCount)
+            return false;
+        for (int i=0; i < this.valueCount; i++) {
+                if (!this.getPosition(i).equals(other.getPosition(i)))
+                    return false;
+        }
+        return true;
+    }
 }
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
index 8e08ca6..8895f5c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionWritable.java
@@ -147,7 +147,6 @@
                 return diff2;
             }
             return diff1;
-            //            return compareBytes(b1, s1, l1, b2, s2, l2);
         }
     }
 
diff --git a/genomix/genomix-hadoop/pom.xml b/genomix/genomix-hadoop/pom.xml
index 610092a..8ca2fa3 100755
--- a/genomix/genomix-hadoop/pom.xml
+++ b/genomix/genomix-hadoop/pom.xml
@@ -174,5 +174,11 @@
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.mrunit</groupId>
+			<artifactId>mrunit</artifactId>
+			<version>1.0.0</version>
+			<classifier>hadoop1</classifier>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
index 38dde9d..c2b0e52 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h3/MergePathsH3.java
@@ -40,6 +40,7 @@
         public static final byte IS_HEAD = 1 << 3;
         public static final byte IS_TAIL = 1 << 4;
         public static final byte IS_PSEUDOHEAD = 1 << 5;
+        public static final byte IS_COMPLETE = 1 << 6;
 
         public static String getFlagAsString(byte code) {
             // TODO: allow multiple flags to be set
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 063c5f7..b240833 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -35,7 +36,7 @@
      * Mapper class: Partition the graph using random pseudoheads.
      * Heads send themselves to their successors, and all others map themselves.
      */
-    private static class MergePathsH4Mapper extends MapReduceBase implements
+    public static class MergePathsH4Mapper extends MapReduceBase implements
             Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
         private static long randSeed;
         private Random randGenerator;
@@ -59,6 +60,7 @@
         private byte outFlag;
 
         public void configure(JobConf conf) {
+            
             randSeed = conf.getLong("randomSeed", 0);
             randGenerator = new Random(randSeed);
             probBeingRandomHead = conf.getFloat("probBeingRandomHead", 0.5f);
@@ -116,24 +118,35 @@
         public void map(PositionWritable key, MessageWritableNodeWithFlag value,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
                 throws IOException {
+            // Node may be marked as head b/c it's a real head or a real tail
+            headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
+            tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
+            outFlag = (byte) (headFlag | tailFlag);
+            
             // only PATH vertices are present. Find the ID's for my neighbors
             curNode.set(value.getNode());
             curID.set(curNode.getNodeID());
+            
             curHead = isNodeRandomHead(curID);
-            hasNext = setNextInfo(curNode);
-            hasPrev = setPrevInfo(curNode);
+            // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
+            // We prevent merging towards non-path nodes
+            hasNext = setNextInfo(curNode) && tailFlag == 0;
+            hasPrev = setPrevInfo(curNode) && headFlag == 0;
             willMerge = false;
             
             reporter.setStatus("CHECK ME OUT");
-            System.out.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
+            System.err.println("mapping node" + curNode.toString() + " next:" + String.valueOf(hasNext) + " prev:" + String.valueOf(hasPrev));
 
             // TODO: need to update edges in neighboring nodes
-
+            
+            if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+                // true HEAD met true TAIL. this path is complete
+                outFlag |= MessageFlag.FROM_SELF;
+                outputValue.set(outFlag, curNode);
+                output.collect(curID, outputValue);
+                return;
+            }
             if (hasNext || hasPrev) {
-                // Node may be marked as head b/c it's a real head or a real tail
-                headFlag = (byte) (MessageFlag.IS_HEAD & value.getFlag());
-                tailFlag = (byte) (MessageFlag.IS_TAIL & value.getFlag());
-                outFlag = (byte) (headFlag | tailFlag);
                 if (curHead) {
                     if (hasNext && !nextHead) {
                         // compress this head to the forward tail
@@ -181,12 +194,16 @@
                 }
             }
 
-            // if we didn't send ourselves to some other node, remap ourselves
+            // if we didn't send ourselves to some other node, remap ourselves for the next round
             if (!willMerge) {
                 outFlag |= MessageFlag.FROM_SELF;
                 outputValue.set(outFlag, curNode);
                 output.collect(curID, outputValue);
             }
+            else {
+                // TODO send update to this node's neighbors
+                //mos.getCollector(UPDATES_OUTPUT, reporter).collect(key, outputValue);
+            }
         }
     }
 
@@ -195,7 +212,10 @@
      */
     private static class MergePathsH4Reducer extends MapReduceBase implements
             Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+        private MultipleOutputs mos;
+        public static final String COMPLETE_OUTPUT = "complete";
+        public static final String UPDATES_OUTPUT = "update";
+        
         private int KMER_SIZE;
         private MessageWritableNodeWithFlag inputValue;
         private MessageWritableNodeWithFlag outputValue;
@@ -209,13 +229,16 @@
         private byte outFlag;
 
         public void configure(JobConf conf) {
+            mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
+            inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             curNode = new NodeWritable(KMER_SIZE);
             prevNode = new NodeWritable(KMER_SIZE);
             nextNode = new NodeWritable(KMER_SIZE);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -223,10 +246,14 @@
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                // all single nodes must be remapped
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
-                    // FROM_SELF => remap self
-                    output.collect(key, inputValue);
+                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+                    if ((inputValue.getFlag() & MessageFlag.IS_HEAD) > 0 && (inputValue.getFlag() & MessageFlag.IS_TAIL) > 0) {
+                        // complete path (H & T meet in this node)
+                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                    } else { 
+                        // FROM_SELF => no merging this round. remap self
+                        output.collect(key, inputValue);
+                    }
                 } else if ((inputValue.getFlag() & (MessageFlag.FROM_PREDECESSOR | MessageFlag.FROM_SUCCESSOR)) > 0) {
                     // FROM_PREDECESSOR | FROM_SUCCESSOR, but singleton?  error here!
                     throw new IOException("Only one value recieved in merge, but it wasn't from self!");
@@ -246,10 +273,12 @@
                         sawPrevNode = true;
                     } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         nextNode.set(inputValue.getNode());
-                        sawNextNode = false;
-                    } else {
+                        sawNextNode = true;
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                         curNode.set(inputValue.getNode());
                         sawCurNode = true;
+                    } else {
+                        throw new IOException("Unknown origin for merging node");
                     }
                     if (!values.hasNext()) {
                         break;
@@ -279,7 +308,7 @@
                 outputValue.set(outFlag, curNode);
                 if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
                     // True heads meeting tails => merge is complete for this node
-                    // TODO: send to the "complete" collector
+                    mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, outputValue);
                 } else {
                     output.collect(key, outputValue);
                 }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
new file mode 100644
index 0000000..6518532
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/removetips/RemoveTips.java
@@ -0,0 +1,152 @@
+package edu.uci.ics.genomix.hadoop.graphclean.removetips;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4;
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h4.MergePathsH4.MergePathsH4Mapper;
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class RemoveTips extends Configured implements Tool {
+
+    /*
+     * Mapper class: removes any tips by not mapping them at all
+     */
+    private static class RemoveTipsMapper extends MapReduceBase implements
+            Mapper<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+        private int KMER_SIZE;
+        private int removeTipsMinLength;
+
+        private MessageWritableNodeWithFlag outputValue;
+        private NodeWritable curNode;
+
+        public void configure(JobConf conf) {
+            removeTipsMinLength = conf.getInt("removeTipsMinLength", 0);
+            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            curNode = new NodeWritable(KMER_SIZE);
+        }
+
+        @Override
+        public void map(PositionWritable key, MessageWritableNodeWithFlag value,
+                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+                throws IOException {
+            curNode.set(value.getNode());
+            if ((curNode.inDegree() == 0 || curNode.outDegree() == 0)
+                    && curNode.getKmer().getKmerLength() < removeTipsMinLength) {
+                // kill this node by NOT mapping it.  Update my neighbors with a suicide note
+                //TODO: update neighbors by removing me from its list
+            } else {
+                outputValue.set(MessageFlag.FROM_SELF, curNode);
+                output.collect(key, value);
+            }
+        }
+    }
+
+    /*
+     * Reducer class: keeps mapped nodes 
+     */
+    private static class MergePathsH4Reducer extends MapReduceBase implements
+            Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
+
+        private int KMER_SIZE;
+        private MessageWritableNodeWithFlag inputValue;
+        private MessageWritableNodeWithFlag outputValue;
+        private NodeWritable curNode;
+        private NodeWritable prevNode;
+        private NodeWritable nextNode;
+        private boolean sawCurNode;
+        private boolean sawPrevNode;
+        private boolean sawNextNode;
+        private int count;
+        private byte outFlag;
+
+        public void configure(JobConf conf) {
+            KMER_SIZE = conf.getInt("sizeKmer", 0);
+            outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
+            curNode = new NodeWritable(KMER_SIZE);
+            prevNode = new NodeWritable(KMER_SIZE);
+            nextNode = new NodeWritable(KMER_SIZE);
+        }
+
+        @Override
+        public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
+                OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
+                throws IOException {
+
+            inputValue.set(values.next());
+            if (!values.hasNext()) {
+                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+                    // FROM_SELF => keep self
+                    output.collect(key, inputValue);
+                } else {
+                    throw new IOException("Only one value recieved in merge, but it wasn't from self!");
+                }
+            } else {
+                throw new IOException("Expected only one node during reduce... saw more");
+            }
+        }
+    }
+
+    /*
+     * Run one iteration of the mergePaths algorithm
+     */
+    public RunningJob run(String inputPath, String outputPath, JobConf baseConf) throws IOException {
+        JobConf conf = new JobConf(baseConf);
+        conf.setJarByClass(MergePathsH4.class);
+        conf.setJobName("MergePathsH4 " + inputPath);
+
+        FileInputFormat.addInputPath(conf, new Path(inputPath));
+        FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+
+        conf.setInputFormat(SequenceFileInputFormat.class);
+        conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+        conf.setMapOutputKeyClass(PositionWritable.class);
+        conf.setMapOutputValueClass(MessageWritableNodeWithFlag.class);
+        conf.setOutputKeyClass(PositionWritable.class);
+        conf.setOutputValueClass(MessageWritableNodeWithFlag.class);
+
+        conf.setMapperClass(MergePathsH4Mapper.class);
+        conf.setReducerClass(MergePathsH4Reducer.class);
+
+        FileSystem.get(conf).delete(new Path(outputPath), true);
+
+        return JobClient.runJob(conf);
+    }
+
+    @Override
+    public int run(String[] arg0) throws Exception {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public static void main(String[] args) throws Exception {
+        int res = ToolRunner.run(new Configuration(), new MergePathsH4(), args);
+        System.out.println("Ran the job fine!");
+        System.exit(res);
+    }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
index 91415ed..f05797e 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/MessageWritableNodeWithFlag.java
@@ -7,6 +7,7 @@
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.WritableComparable;
 
+import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
 
 /*
@@ -20,7 +21,7 @@
     public MessageWritableNodeWithFlag() {
         this(0);
     }
-    
+
     public MessageWritableNodeWithFlag(int k) {
         this.flag = 0;
         this.node = new NodeWritable(k);
@@ -30,6 +31,11 @@
         this.flag = flag;
         this.node = new NodeWritable(kmerSize);
     }
+    
+    public MessageWritableNodeWithFlag(byte flag, NodeWritable node) {
+        this(node.getKmer().getKmerLength());
+        set(flag, node);
+    }
 
     public void set(MessageWritableNodeWithFlag right) {
         set(right.getFlag(), right.getNode());
@@ -79,4 +85,19 @@
     public int getLength() {
         return node.getCount();
     }
+
+    @Override
+    public int hashCode() {
+//        return super.hashCode() + flag + node.hashCode();
+        return flag + node.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object rightObj) {
+        if (rightObj instanceof MessageWritableNodeWithFlag) {
+            MessageWritableNodeWithFlag rightMessage = (MessageWritableNodeWithFlag) rightObj;
+            return (this.flag == rightMessage.flag && this.node.equals(rightMessage.node));
+        }
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
index e7bcdf6..497e926 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/PathNodeInitial.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -57,10 +58,7 @@
         private int inDegree;
         private int outDegree;
         private NodeWritable emptyNode;
-
-        public PathNodeInitialMapper() {
-
-        }
+        private Iterator<PositionWritable> posIterator;
 
         public void configure(JobConf conf) {
             KMER_SIZE = conf.getInt("sizeKmer", 0);
@@ -80,47 +78,79 @@
                 outputValue.set(MessageFlag.FROM_SELF, key);
                 output.collect(key.getNodeID(), outputValue);
                 reporter.incrCounter("genomix", "path_nodes", 1);
-            } else if (outDegree == 1) {
-                // Not a path myself, but my successor might be one. Map forward successor
+            } else if (inDegree == 0 && outDegree == 1) {
+                // start of a tip.  needs to merge & be marked as head
+                outputValue.set(MessageFlag.FROM_SELF, key);
+                output.collect(key.getNodeID(), outputValue);
+                reporter.incrCounter("genomix", "path_nodes", 1);
+
                 outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
-                if (key.getFFList().getCountOfPosition() > 0) {
-                    outputKey.set(key.getFFList().getPosition(0));
-                } else {
-                    outputKey.set(key.getFRList().getPosition(0));
-                }
-                output.collect(outputKey, outputValue);
-            } else if (inDegree == 1) {
-                // Not a path myself, but my predecessor might be one.
+                output.collect(key.getNodeID(), outputValue);
+            } else if (inDegree == 1 && outDegree == 0) {
+                // end of a tip.  needs to merge & be marked as tail
+                outputValue.set(MessageFlag.FROM_SELF, key);
+                output.collect(key.getNodeID(), outputValue);
+                reporter.incrCounter("genomix", "path_nodes", 1);
+
                 outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
-                if (key.getRRList().getCountOfPosition() > 0) {
-                    outputKey.set(key.getRRList().getPosition(0));
-                } else {
-                    outputKey.set(key.getRFList().getPosition(0));
-                }
-                output.collect(outputKey, outputValue);
+                output.collect(key.getNodeID(), outputValue);
             } else {
-                // TODO: all other nodes will not participate-- should they be collected in a "complete" output?
+                if (outDegree > 0) {
+                    // Not a path myself, but my successor might be one. Map forward successor to find heads
+                    outputValue.set(MessageFlag.FROM_PREDECESSOR, emptyNode);
+                    posIterator = key.getFFList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                    posIterator = key.getFRList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                }
+                if (inDegree > 0) {
+                    // Not a path myself, but my predecessor might be one. map predecessor to find tails 
+                    outputValue.set(MessageFlag.FROM_SUCCESSOR, emptyNode);
+                    posIterator = key.getRRList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                    posIterator = key.getRFList().iterator();
+                    while (posIterator.hasNext()) {
+                        outputKey.set(posIterator.next());
+                        output.collect(outputKey, outputValue);
+                    }
+                }
+                // push this non-path node to the "complete" output
+                outputValue.set((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), key);
+                output.collect(key.getNodeID(), outputValue);
             }
         }
     }
 
     public static class PathNodeInitialReducer extends MapReduceBase implements
             Reducer<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag> {
-
+        private MultipleOutputs mos;
+        private static final String COMPLETE_OUTPUT = "complete";
         private int KMER_SIZE;
         private MessageWritableNodeWithFlag inputValue;
         private MessageWritableNodeWithFlag outputValue;
         private NodeWritable nodeToKeep;
         private int count;
         private byte flag;
+        private boolean isComplete;
 
         public void configure(JobConf conf) {
+            mos = new MultipleOutputs(conf);
             KMER_SIZE = conf.getInt("sizeKmer", 0);
             inputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             outputValue = new MessageWritableNodeWithFlag(KMER_SIZE);
             nodeToKeep = new NodeWritable(KMER_SIZE);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public void reduce(PositionWritable key, Iterator<MessageWritableNodeWithFlag> values,
                 OutputCollector<PositionWritable, MessageWritableNodeWithFlag> output, Reporter reporter)
@@ -128,41 +158,63 @@
 
             inputValue.set(values.next());
             if (!values.hasNext()) {
-                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
-                    // FROM_SELF => need to keep this PATH node
-                    output.collect(key, inputValue);
+                if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
+                    if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
+                        // non-path node.  Store in "complete" output
+                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                    } else {
+                        // FROM_SELF => need to keep this PATH node
+                        output.collect(key, inputValue);
+                    }
                 }
             } else {
                 // multiple inputs => possible HEAD or TAIL to a path node. note if HEAD or TAIL node 
                 count = 0;
                 flag = MessageFlag.EMPTY_MESSAGE;
+                isComplete = false;
                 while (true) { // process values; break when no more
                     count++;
-                    if ((inputValue.getFlag() & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
+                    if ((inputValue.getFlag() & MessageFlag.FROM_SELF) > 0) {
                         // SELF -> keep this node
+                        flag |= MessageFlag.FROM_SELF;
                         nodeToKeep.set(inputValue.getNode());
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) == MessageFlag.FROM_SUCCESSOR) {
+                        if ((inputValue.getFlag() & MessageFlag.IS_COMPLETE) > 0) {
+                            isComplete = true;
+                        }
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_SUCCESSOR) > 0) {
                         flag |= MessageFlag.IS_TAIL;
-                        reporter.incrCounter("genomix", "path_nodes_tails", 1);
-                    } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) == MessageFlag.FROM_PREDECESSOR) {
+                    } else if ((inputValue.getFlag() & MessageFlag.FROM_PREDECESSOR) > 0) {
                         flag |= MessageFlag.IS_HEAD;
-                        reporter.incrCounter("genomix", "path_nodes_heads", 1);
                     }
                     if (!values.hasNext()) {
                         break;
                     } else {
-                        inputValue = values.next();
+                        inputValue.set(values.next());
                     }
                 }
                 if (count < 2) {
                     throw new IOException("Expected at least two nodes in PathNodeInitial reduce; saw "
                             + String.valueOf(count));
                 }
-                if ((flag & MessageFlag.FROM_SELF) == MessageFlag.FROM_SELF) {
-                    // only map simple path nodes
-                    outputValue.set(flag, nodeToKeep);
-                    output.collect(key, outputValue);
-                    reporter.incrCounter("genomix", "path_nodes", 1);
+                if ((flag & MessageFlag.FROM_SELF) > 0) {
+                    if ((flag & MessageFlag.IS_COMPLETE) > 0) {
+                        // non-path node.  Store in "complete" output
+                        mos.getCollector(COMPLETE_OUTPUT, reporter).collect(key, inputValue);
+                    } else {
+                        // only keep simple path nodes
+                        outputValue.set(flag, nodeToKeep);
+                        output.collect(key, outputValue);
+
+                        reporter.incrCounter("genomix", "path_nodes", 1);
+                        if ((flag & MessageFlag.IS_HEAD) > 0) {
+                            reporter.incrCounter("genomix", "path_nodes_heads", 1);
+                        }
+                        if ((flag & MessageFlag.IS_TAIL) > 0) {
+                            reporter.incrCounter("genomix", "path_nodes_tails", 1);
+                        }
+                    }
+                } else {
+                    throw new IOException("No SELF node recieved in reduce! key=" + key.toString() + " flag=" + flag);
                 }
             }
         }
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
index eaf2a6f..c735a0d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/velvetgraphbuilding/GraphBuildingDriver.java
@@ -52,6 +52,7 @@
     public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength,
             boolean onlyTest1stJob, boolean seqOutput, String defaultConfPath) throws IOException {
         if (onlyTest1stJob == true) {
+            
             runfirstjob(inputPath, numReducers, sizeKmer, readLength, seqOutput, defaultConfPath);
         } else {
             runfirstjob(inputPath, numReducers, sizeKmer, readLength, true, defaultConfPath);
@@ -113,6 +114,7 @@
 
         conf.setPartitionerClass(ReadIDPartitioner.class);
 
+        // grouping is done on the readID only; sorting is based on the (readID, abs(posn)) 
         conf.setOutputKeyComparatorClass(PositionWritable.Comparator.class);
         conf.setOutputValueGroupingComparator(PositionWritable.FirstComparator.class);
 
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
new file mode 100644
index 0000000..b142f87
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/TestPathNodeInitial.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.genomix.hadoop.pmcommon;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mrunit.MapDriver;
+import org.apache.hadoop.mrunit.ReduceDriver;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.graphclean.mergepaths.h3.MergePathsH3.MessageFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.MessageWritableNodeWithFlag;
+import edu.uci.ics.genomix.hadoop.pmcommon.PathNodeInitial;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class TestPathNodeInitial {
+    PositionWritable posn1 = new PositionWritable(0, (byte) 1);
+    PositionWritable posn2 = new PositionWritable(1, (byte) 1);
+    PositionWritable posn3 = new PositionWritable(2, (byte) 1);
+    PositionWritable posn4 = new PositionWritable(3, (byte) 1);
+    PositionWritable posn5 = new PositionWritable(5, (byte) 1);
+    String kmerString = "ATGCA";
+    KmerBytesWritable kmer = new KmerBytesWritable(kmerString.length(), kmerString);
+    JobConf conf = new JobConf();
+    MultipleOutputs mos = new MultipleOutputs(conf); 
+
+    {
+        conf.set("sizeKmer", String.valueOf(kmerString.length()));
+    }
+
+    @Test
+    public void testNoNeighbors() throws IOException {
+        NodeWritable noNeighborNode = new NodeWritable(posn1, new PositionListWritable(), new PositionListWritable(),
+                new PositionListWritable(), new PositionListWritable(), kmer);
+        MessageWritableNodeWithFlag output = new MessageWritableNodeWithFlag((byte) (MessageFlag.FROM_SELF | MessageFlag.IS_COMPLETE), noNeighborNode);
+        // test mapper
+        new MapDriver<NodeWritable, NullWritable, PositionWritable, MessageWritableNodeWithFlag>()
+                .withMapper(new PathNodeInitial.PathNodeInitialMapper())
+                .withConfiguration(conf)
+                .withInput(noNeighborNode, NullWritable.get())
+                .withOutput(posn1, output)
+                .runTest();
+        // test reducer
+//        MultipleOutputs.addNamedOutput(conf, "complete", SequenceFileOutputFormat.class, PositionWritable.class, MessageWritableNodeWithFlag.class);
+        new ReduceDriver<PositionWritable, MessageWritableNodeWithFlag, PositionWritable, MessageWritableNodeWithFlag>()
+                .withReducer(new PathNodeInitial.PathNodeInitialReducer())
+                .withConfiguration(conf)
+                .withInput(posn1, Arrays.asList(output))
+                .runTest();
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
new file mode 100644
index 0000000..d3d52f3
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/api/io/binary/BinaryDataCleanVertexInputFormat.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.genomix.pregelix.api.io.binary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class BinaryDataCleanVertexInputFormat<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
+
+    /** Uses the SequenceFileInputFormat to do everything */
+    @SuppressWarnings("rawtypes")
+    protected SequenceFileInputFormat binaryInputFormat = new SequenceFileInputFormat();
+
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex input. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class BinaryDataCleanVertexReader<I extends WritableComparable<?>, V extends Writable, E extends Writable, M extends Writable>
+            implements VertexReader<I, V, E, M> {
+        /** Internal line record reader */
+        private final RecordReader<PositionWritable, ValueStateWritable> lineRecordReader;
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+
+        /**
+         * Initialize with the LineRecordReader.
+         * 
+         * @param recordReader
+         *            Line record reader from SequenceFileInputFormat
+         */
+        public BinaryDataCleanVertexReader(RecordReader<PositionWritable, ValueStateWritable> recordReader) {
+            this.lineRecordReader = recordReader;
+        }
+
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            lineRecordReader.initialize(inputSplit, context);
+            this.context = context;
+        }
+
+        @Override
+        public void close() throws IOException {
+            lineRecordReader.close();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return lineRecordReader.getProgress();
+        }
+
+        /**
+         * Get the line record reader.
+         * 
+         * @return Record reader to be used for reading.
+         */
+        protected RecordReader<PositionWritable, ValueStateWritable> getRecordReader() {
+            return lineRecordReader;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        protected TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        // Ignore the hint of numWorkers here since we are using SequenceFileInputFormat
+        // to do this for us
+        return binaryInputFormat.getSplits(context);
+    }
+
+    @Override
+    public VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+        return null;
+    }
+
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 94b0c51..e135085 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -9,9 +9,9 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
new file mode 100644
index 0000000..140a703
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanInputFormat.java
@@ -0,0 +1,74 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat;
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexInputFormat.BinaryDataCleanVertexReader;
+
+public class DataCleanInputFormat extends
+    BinaryDataCleanVertexInputFormat<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+    /**
+     * Format INPUT
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public VertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new BinaryDataCleanLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+    }
+}
+
+@SuppressWarnings("rawtypes")
+class BinaryDataCleanLoadGraphReader extends
+    BinaryDataCleanVertexReader<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+    private Vertex vertex;
+    private PositionWritable vertexId = new PositionWritable();
+    private ValueStateWritable vertexValue = new ValueStateWritable();
+
+    public BinaryDataCleanLoadGraphReader(RecordReader<PositionWritable, ValueStateWritable> recordReader) {
+        super(recordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> getCurrentVertex()
+            throws IOException, InterruptedException {
+        if (vertex == null)
+            vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+
+        vertex.reset();
+        if (getRecordReader() != null) {
+            /**
+             * set the src vertex id
+             */
+            vertexId.set(getRecordReader().getCurrentKey());
+            vertex.setVertexId(vertexId);
+            /**
+             * set the vertex value
+             */
+            vertexValue.set(getRecordReader().getCurrentValue());
+            vertex.setVertexValue(vertexValue);
+        }
+
+        return vertex;
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
new file mode 100644
index 0000000..40abd3e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class DataCleanOutputFormat extends
+        BinaryVertexOutputFormat<PositionWritable, ValueStateWritable, NullWritable> {
+
+    @Override
+    public VertexWriter<PositionWritable, ValueStateWritable, NullWritable> createVertexWriter(
+            TaskAttemptContext context) throws IOException, InterruptedException {
+        @SuppressWarnings("unchecked")
+        RecordWriter<PositionWritable, ValueStateWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+        return new BinaryLoadGraphVertexWriter(recordWriter);
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+     */
+    public static class BinaryLoadGraphVertexWriter extends
+            BinaryVertexWriter<PositionWritable, ValueStateWritable, NullWritable> {
+        public BinaryLoadGraphVertexWriter(RecordWriter<PositionWritable, ValueStateWritable> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<PositionWritable, ValueStateWritable, NullWritable, ?> vertex)
+                throws IOException, InterruptedException {
+            getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+        }
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
index fae1970..a0a8ea5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/AdjacencyListWritable.java
@@ -27,6 +27,10 @@
         reverseList.reset();
     }
     
+    public int getCountOfPosition(){
+    	return forwardList.getCountOfPosition() + reverseList.getCountOfPosition();
+    }
+    
     public PositionListWritable getForwardList() {
         return forwardList;
     }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
new file mode 100644
index 0000000..55ddb1b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -0,0 +1,185 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class MergeBubbleMessageWritable implements WritableComparable<MergeBubbleMessageWritable> {
+    /**
+     * sourceVertexId stores source vertexId when headVertex sends the message
+     * stores neighber vertexValue when pathVertex sends the message
+     * file stores the point to the file that stores the chains of connected DNA
+     */
+    private PositionWritable sourceVertexId;
+    private KmerBytesWritable chainVertexId;
+    private AdjacencyListWritable neighberNode; //incoming or outgoing
+    private byte message;
+    private PositionWritable startVertexId;
+
+    private byte checkMessage;
+
+    public MergeBubbleMessageWritable() {
+        sourceVertexId = new PositionWritable();
+        chainVertexId = new KmerBytesWritable(0);
+        neighberNode = new AdjacencyListWritable();
+        message = Message.NON;
+        startVertexId = new PositionWritable();
+        checkMessage = (byte) 0;
+    }
+    
+    public void set(MessageWritable msg) {
+        checkMessage = 0;
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(msg.getSourceVertexId());
+        }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(msg.getChainVertexId());
+        }
+        if (neighberNode != null) {
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(msg.getNeighberNode());
+        }
+        this.message = msg.getMessage();
+    }
+
+    public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+        checkMessage = 0;
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+        }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(chainVertexId);
+        }
+        if (neighberNode != null) {
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(neighberNode);
+        }
+        this.message = message;
+    }
+
+    public void reset() {
+        checkMessage = 0;
+        chainVertexId.reset(1);
+        neighberNode.reset();
+        message = Message.NON;
+    }
+
+    public PositionWritable getSourceVertexId() {
+        return sourceVertexId;
+    }
+
+    public void setSourceVertexId(PositionWritable sourceVertexId) {
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+        }
+    }
+    
+    public KmerBytesWritable getChainVertexId() {
+        return chainVertexId;
+    }
+
+    public void setChainVertexId(KmerBytesWritable chainVertexId) {
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(chainVertexId);
+        }
+    }
+    
+    public AdjacencyListWritable getNeighberNode() {
+        return neighberNode;
+    }
+
+    public void setNeighberNode(AdjacencyListWritable neighberNode) {
+        if(neighberNode != null){
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(neighberNode);
+        }
+    }
+    
+    public int getLengthOfChain() {
+        return chainVertexId.getKmerLength();
+    }
+
+    public PositionWritable getStartVertexId() {
+        return startVertexId;
+    }
+
+    public void setStartVertexId(PositionWritable startVertexId) {
+        if(startVertexId != null){
+            checkMessage |= CheckMessage.START;
+            this.startVertexId.set(startVertexId);
+        }
+    }
+
+    public byte getMessage() {
+        return message;
+    }
+
+    public void setMessage(byte message) {
+        this.message = message;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeByte(checkMessage);
+        if ((checkMessage & CheckMessage.SOURCE) != 0)
+            sourceVertexId.write(out);
+        if ((checkMessage & CheckMessage.CHAIN) != 0)
+            chainVertexId.write(out);
+        if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+            neighberNode.write(out);
+        if ((checkMessage & CheckMessage.START) != 0)
+            startVertexId.write(out);
+        out.write(message);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.reset();
+        checkMessage = in.readByte();
+        if ((checkMessage & CheckMessage.SOURCE) != 0)
+            sourceVertexId.readFields(in);
+        if ((checkMessage & CheckMessage.CHAIN) != 0)
+            chainVertexId.readFields(in);
+        if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+            neighberNode.readFields(in);
+        if ((checkMessage & CheckMessage.START) != 0)
+            startVertexId.readFields(in);
+        message = in.readByte();
+    }
+
+    @Override
+    public int hashCode() {
+        return sourceVertexId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof MergeBubbleMessageWritable) {
+            MergeBubbleMessageWritable tp = (MergeBubbleMessageWritable) o;
+            return sourceVertexId.equals(tp.sourceVertexId);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return sourceVertexId.toString();
+    }
+
+    public int compareTo(MergeBubbleMessageWritable tp) {
+        return sourceVertexId.compareTo(tp.sourceVertexId);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index f0a6b58..6a71344 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -31,6 +31,23 @@
         message = Message.NON;
         checkMessage = (byte) 0;
     }
+    
+    public void set(MessageWritable msg) {
+        checkMessage = 0;
+        if (sourceVertexId != null) {
+            checkMessage |= CheckMessage.SOURCE;
+            this.sourceVertexId.set(msg.getSourceVertexId());
+        }
+        if (chainVertexId != null) {
+            checkMessage |= CheckMessage.CHAIN;
+            this.chainVertexId.set(msg.getChainVertexId());
+        }
+        if (neighberNode != null) {
+            checkMessage |= CheckMessage.NEIGHBER;
+            this.neighberNode.set(msg.getNeighberNode());
+        }
+        this.message = msg.getMessage();
+    }
 
     public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
         checkMessage = 0;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 985c2ac..7895da4 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -29,7 +29,7 @@
                 reverseForwardList, reverseReverseList,
                 state, mergeChain);
     }
-
+    
     public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
             PositionListWritable reverseForwardList, PositionListWritable reverseReverseList, 
             byte state, KmerBytesWritable mergeChain) {
@@ -41,6 +41,11 @@
         this.mergeChain.set(mergeChain);
     }
     
+    public void set(ValueStateWritable value) {
+        set(value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
+                value.getMergeChain());
+    }
+    
     public PositionListWritable getFFList() {
         return outgoingList.getForwardList();
     }
@@ -78,7 +83,7 @@
     }
 
     public void setIncomingList(AdjacencyListWritable incomingList) {
-        this.incomingList = incomingList;
+        this.incomingList.set(incomingList);
     }
 
     public AdjacencyListWritable getOutgoingList() {
@@ -86,7 +91,7 @@
     }
 
     public void setOutgoingList(AdjacencyListWritable outgoingList) {
-        this.outgoingList = outgoingList;
+        this.outgoingList.set(outgoingList);
     }
 
     public byte getState() {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
new file mode 100644
index 0000000..8716d8d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.genomix.pregelix.operator.bridgeremove;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10    0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BridgeRemoveVertex extends
+        Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+    public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
+    public static final String LENGTH = "BridgeRemoveVertex.length";
+    public static int kmerSize = -1;
+    private int length = -1;
+
+    private MessageWritable incomingMsg = new MessageWritable();
+    private MessageWritable outgoingMsg = new MessageWritable();
+    private ArrayList<MessageWritable> receivedMsg = new ArrayList<MessageWritable>();
+    
+    private PositionWritable destVertexId = new PositionWritable();
+    private Iterator<PositionWritable> posIterator;
+    /**
+     * initiate kmerSize, maxIteration
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if(length == -1)
+            length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+        outgoingMsg.reset();
+        receivedMsg.clear();
+    }
+    
+    /**
+     * head send message to all next nodes
+     */
+    public void sendMsgToAllNextNodes(ValueStateWritable value) {
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMFF);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMFR);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+
+    /**
+     * head send message to all previous nodes
+     */
+    public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+        posIterator = value.getRFList().iterator(); // RFList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMRF);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getRRList().iterator(); // RRList
+        while(posIterator.hasNext()){
+            outgoingMsg.setMessage(AdjMessage.FROMRR);
+            outgoingMsg.setSourceVertexId(getVertexId());
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+
+    @Override
+    public void compute(Iterator<MessageWritable> msgIterator) {
+        initVertex();
+        if (getSuperstep() == 1) {
+            if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+                sendMsgToAllNextNodes(getVertexValue());
+            }
+           else if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+                sendMsgToAllPreviousNodes(getVertexValue());
+           }
+        }
+        else if (getSuperstep() == 2){
+            int i = 0;
+            while (msgIterator.hasNext()) {
+                if(i == 3)
+                    break;
+                receivedMsg.add(msgIterator.next());
+                i++;
+            }
+            if(receivedMsg.size() == 2){
+                if(getVertexValue().getLengthOfMergeChain() > length){
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    if(receivedMsg.get(0).getMessage() == AdjMessage.FROMFF 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR){
+                        outgoingMsg.setMessage(AdjMessage.FROMRR);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFF);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFF 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+                        outgoingMsg.setMessage(AdjMessage.FROMRR);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFR);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR) {
+                        outgoingMsg.setMessage(AdjMessage.FROMRF);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFF);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR 
+                            && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+                        outgoingMsg.setMessage(AdjMessage.FROMRF);
+                        sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+                        outgoingMsg.setMessage(AdjMessage.FROMFR);
+                        sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+                        deleteVertex(getVertexId());
+                    }
+                }
+            }
+        }
+        else if(getSuperstep() == 3){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+                    //remove incomingMsg.getSourceId from RR positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+                  //remove incomingMsg.getSourceId from RF positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+                  //remove incomingMsg.getSourceId from FR positionList
+                } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+                  //remove incomingMsg.getSourceId from FF positionList
+                }
+            }
+        }
+        voteToHalt();
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(BridgeRemoveVertex.class.getSimpleName());
+        job.setVertexClass(BridgeRemoveVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(DataCleanInputFormat.class);
+        job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(PositionWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        Client.run(args, job);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
new file mode 100644
index 0000000..e16cd20
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10    0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BubbleMergeVertex extends
+        Vertex<PositionWritable, ValueStateWritable, NullWritable, MergeBubbleMessageWritable> {
+    public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
+    public static final String ITERATIONS = "BubbleMergeVertex.iteration";
+    public static int kmerSize = -1;
+    private int maxIteration = -1;
+
+    private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
+    private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
+
+    private PositionWritable destVertexId = new PositionWritable();
+    private Iterator<PositionWritable> posIterator;
+    private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsg = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
+    private ArrayList<MergeBubbleMessageWritable> tmpMsg = new ArrayList<MergeBubbleMessageWritable>();
+    
+    /**
+     * initiate kmerSize, maxIteration
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if (maxIteration < 0)
+            maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+        outgoingMsg.reset();
+    }
+    /**
+     * get destination vertex
+     */
+    public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+        if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+            posIterator = value.getFFList().iterator();
+        else // #FRList() > 0
+            posIterator = value.getFRList().iterator();
+        return posIterator.next();
+    }
+
+    /**
+     * head send message to all next nodes
+     */
+    public void sendMsgToAllNextNodes(ValueStateWritable value) {
+        posIterator = value.getFFList().iterator(); // FFList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+        posIterator = value.getFRList().iterator(); // FRList
+        while(posIterator.hasNext()){
+            destVertexId.set(posIterator.next());
+            sendMsg(destVertexId, outgoingMsg);
+        }
+    }
+    
+    @Override
+    public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
+        initVertex();
+        if (getSuperstep() == 1) {
+            if(VertexUtil.isHeadVertex(getVertexValue())){
+                outgoingMsg.setSourceVertexId(getVertexId());
+                sendMsgToAllNextNodes(getVertexValue());
+            }
+        } else if (getSuperstep() == 2){
+            while (msgIterator.hasNext()) {
+                if(VertexUtil.isPathVertex(getVertexValue())){
+                    outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+                    destVertexId.set(getNextDestVertexId(getVertexValue()));
+                    sendMsg(destVertexId, outgoingMsg);
+                }
+            }
+        } else if (getSuperstep() == 3){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(!receivedMsg.containsKey(incomingMsg.getStartVertexId())){
+                    tmpMsg.clear();
+                    tmpMsg.add(incomingMsg);
+                    receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+                }
+                else{
+                    tmpMsg.clear();
+                    tmpMsg.addAll(receivedMsg.get(incomingMsg.getStartVertexId()));
+                    tmpMsg.add(incomingMsg);
+                    receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+                }
+            }
+            for(PositionWritable prevId : receivedMsg.keySet()){
+                tmpMsg = receivedMsg.get(prevId);
+                if(tmpMsg.size() > 1){
+                    //find the node with largest length of mergeChain
+                    boolean flag = true; //the same length
+                    int maxLength = tmpMsg.get(0).getLengthOfChain();
+                    PositionWritable max = tmpMsg.get(0).getSourceVertexId();
+                    for(int i = 1; i < tmpMsg.size(); i++){
+                        if(tmpMsg.get(i).getLengthOfChain() != maxLength)
+                            flag = false;
+                        if(tmpMsg.get(i).getLengthOfChain() > maxLength){
+                            maxLength = tmpMsg.get(i).getLengthOfChain();
+                            max = tmpMsg.get(i).getSourceVertexId();
+                        }
+                    }
+                    //send merge or unchange Message to node with largest length
+                    if(flag == true){
+                        //send unchange Message to node with largest length
+                        //we can send no message to complete this step
+                        //send delete Message to node which doesn't have largest length
+                        for(int i = 0; i < tmpMsg.size(); i++){
+                            if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+                                outgoingMsg.setMessage(AdjMessage.KILL);
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            } else {
+                                outgoingMsg.setMessage(AdjMessage.UNCHANGE);
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            }
+                        }
+                    } else{
+                        //send merge Message to node with largest length
+                        for(int i = 0; i < tmpMsg.size(); i++){
+                            if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+                                outgoingMsg.setMessage(AdjMessage.KILL);
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            } else {
+                                outgoingMsg.setMessage(AdjMessage.MERGE);
+                                /* add other node in message */
+                                sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+                            }
+                        }
+                    }
+                }
+            }
+        } else if (getSuperstep() == 4){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(incomingMsg.getMessage() == AdjMessage.KILL){
+                    deleteVertex(getVertexId());
+                } else if (incomingMsg.getMessage() == AdjMessage.MERGE){
+                    //merge with small node
+                }
+            }
+        }
+        voteToHalt();
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(BubbleMergeVertex.class.getSimpleName());
+        job.setVertexClass(BubbleMergeVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(DataCleanInputFormat.class);
+        job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(PositionWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        Client.run(args, job);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index c3bb663..84c7f52 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.operator;
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
 import java.util.Iterator;
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index 4fcc09e..722206a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.operator;
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
 import java.util.Iterator;
 import org.apache.hadoop.io.NullWritable;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
similarity index 99%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 8a03aa7..5ba5f31 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.pregelix.operator;
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
 import java.util.Iterator;
 import org.apache.hadoop.io.NullWritable;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
new file mode 100644
index 0000000..4a174ec
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -0,0 +1,128 @@
+package edu.uci.ics.genomix.pregelix.operator.tipremove;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10    0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+/**
+ *  Remove tip or single node when l > constant
+ */
+public class TipRemoveVertex extends
+        Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+    public static final String KMER_SIZE = "TipRemoveVertex.kmerSize";
+    public static final String LENGTH = "TipRemoveVertex.length";
+    public static int kmerSize = -1;
+    private int length = -1;
+
+    private MessageWritable incomingMsg = new MessageWritable();
+    private MessageWritable outgoingMsg = new MessageWritable();
+    
+    /**
+     * initiate kmerSize, length
+     */
+    public void initVertex() {
+        if (kmerSize == -1)
+            kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+        if(length == -1)
+            length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+        outgoingMsg.reset();
+    }
+
+    @Override
+    public void compute(Iterator<MessageWritable> msgIterator) {
+        initVertex(); 
+        if(getSuperstep() == 1){
+            if(VertexUtil.isIncomingTipVertex(getVertexValue())){
+            	if(getVertexValue().getLengthOfMergeChain() > length){
+            		if(getVertexValue().getFFList().getCountOfPosition() > 0)
+            			outgoingMsg.setMessage(AdjMessage.FROMFF);
+            		else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+            			outgoingMsg.setMessage(AdjMessage.FROMFR);
+            		outgoingMsg.setSourceVertexId(getVertexId());
+            		deleteVertex(getVertexId());
+            	}
+            }
+            else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
+                if(getVertexValue().getLengthOfMergeChain() > length){
+                    if(getVertexValue().getRFList().getCountOfPosition() > 0)
+                        outgoingMsg.setMessage(AdjMessage.FROMRF);
+                    else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+                        outgoingMsg.setMessage(AdjMessage.FROMRR);
+                    outgoingMsg.setSourceVertexId(getVertexId());
+                    deleteVertex(getVertexId());
+                }
+            }
+            else if(VertexUtil.isSingleVertex(getVertexValue())){
+                if(getVertexValue().getLengthOfMergeChain() > length)
+                    deleteVertex(getVertexId());
+            }
+        }
+        else if(getSuperstep() == 2){
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+                    //remove incomingMsg.getSourceId from RR positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+                  //remove incomingMsg.getSourceId from RF positionList
+                } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+                  //remove incomingMsg.getSourceId from FR positionList
+                } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+                  //remove incomingMsg.getSourceId from FF positionList
+                }
+            }
+        }
+        voteToHalt();
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(TipRemoveVertex.class.getSimpleName());
+        job.setVertexClass(TipRemoveVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(DataCleanInputFormat.class);
+        job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(PositionWritable.class);
+        job.setOutputValueClass(ValueStateWritable.class);
+        Client.run(args, job);
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
index b81491b..452f72d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -13,7 +13,6 @@
 
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
 import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
new file mode 100644
index 0000000..ca8d795
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class AdjMessage {
+    public static final byte FROMFF = 0;
+    public static final byte FROMFR = 1;
+    public static final byte FROMRF = 2;
+    public static final byte FROMRR = 3;
+    public static final byte NON = 4;
+    public static final byte UNCHANGE = 5;
+    public static final byte MERGE = 6;
+    public static final byte KILL = 7;
+    
+    public final static class ADJMESSAGE_CONTENT {
+        public static String getContentFromCode(byte code) {
+            String r = "";
+            switch (code) {
+                case FROMFF:
+                    r = "FROMFF";
+                    break;
+                case FROMFR:
+                    r = "FROMFR";
+                    break;
+                case FROMRF:
+                    r = "FROMRF";
+                    break;
+                case FROMRR:
+                    r = "FROMRR";
+                    break;
+                case NON:
+                    r = "NON";
+                    break;
+                case UNCHANGE:
+                    r = "UNCHANGE";
+                    break;
+                case MERGE:
+                    r = "MERGE";
+                    break;
+                case KILL:
+                    r = "KILL";
+                    break;
+            }
+            return r;
+        }
+    }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 6e6a97a..c7bcf48 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -8,6 +8,7 @@
     public static final byte MESSAGE = 1 << 3;
     public static final byte STATE = 1 << 4;
     public static final byte LASTGENECODE = 1 << 5;
+    public static final byte START = 1 << 6;
 
     public final static class CheckMessage_CONTENT {
 
@@ -32,6 +33,9 @@
                 case LASTGENECODE:
                     r = "LASTGENECODE";
                     break;
+                case START:
+                    r = "START";
+                    break;
             }
             return r;
         }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index fa5f73b..4644383 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -8,7 +8,8 @@
     public static final byte STOP = 3;
     public static final byte FROMPSEUDOHEAD = 4;
     public static final byte FROMPSEUDOREAR = 5;
-    public static final byte FROMSELF = 6;
+    public static final byte IN = 6;
+    public static final byte OUT = 7;
 
     public final static class MESSAGE_CONTENT {
 
@@ -33,8 +34,11 @@
                 case FROMPSEUDOREAR:
                     r = "FROMPSEUDOREAR";
                     break;
-                case FROMSELF:
-                    r = "FROMSELF";
+                case IN:
+                    r = "IN";
+                    break;
+                case OUT:
+                    r = "OUT";
                     break;
             }
             return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 1740744..772690d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -47,4 +47,36 @@
         }
         return false;*/
     }
+    
+    /**
+     * check if vertex is a tip
+     */
+    public static boolean isIncomingTipVertex(ValueStateWritable value){
+    	return value.inDegree() == 0 && value.outDegree() == 1;
+    }
+    
+    public static boolean isOutgoingTipVertex(ValueStateWritable value){
+    	return value.inDegree() == 1 && value.outDegree() == 0;
+    }
+    
+    /**
+     * check if vertex is single
+     */
+    public static boolean isSingleVertex(ValueStateWritable value){
+        return value.inDegree() == 0 && value.outDegree() == 0;
+    }
+    
+    /**
+     * check if vertex is upbridge
+     */
+    public static boolean isUpBridgeVertex(ValueStateWritable value){
+        return value.inDegree() == 1 && value.outDegree() > 1;
+    }
+    
+    /**
+     * check if vertex is downbridge
+     */
+    public static boolean isDownBridgeVertex(ValueStateWritable value){
+        return value.inDegree() > 1 && value.outDegree() == 1;
+    }
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index cdc97c1..16b2794 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -9,9 +9,9 @@
 import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
 import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
 import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
 import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.pregelix.api.job.PregelixJob;