Merge branch 'wbiesing/debug' into genomix/fullstack_genomix

Conflicts:
	genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
index fbcceaa..0582e08 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
@@ -30,7 +30,6 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
 
 
 public class EdgeListWritable implements WritableComparable<EdgeListWritable>, Serializable, Iterable<EdgeWritable>{
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 700b3b5..e82c37d 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -559,7 +559,10 @@
     public int outDegree() {
         return edges[DirectionFlag.DIR_FF].getCountOfPosition() + edges[DirectionFlag.DIR_FR].getCountOfPosition();
     }
-
+    
+    public int getDegree(boolean prev){
+        return prev ? inDegree() : outDegree();
+    }
     /*
      * Return if this node is a "path" compressible node, that is, it has an
      * in-degree and out-degree of 1
diff --git a/genomix/genomix-hadoop/data/webmap/PathMerge_TestSet/ThreeNodesCycle/3 b/genomix/genomix-hadoop/data/webmap/PathMerge_TestSet/ThreeNodesCycle/3
new file mode 100644
index 0000000..0991038
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/PathMerge_TestSet/ThreeNodesCycle/3
@@ -0,0 +1 @@
+1	ACAACA
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java
index 988a88b..5b76446 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java
@@ -32,7 +32,7 @@
 //        + "P2_8"
 //        + "SmallCycle"
 //        + "sameWithEdge"
-        + "FR_RF_Simple"
+        + "ThreeNodesCycle"
     };
 //          + "SimpleRectangle", PreFix + File.separator
 //          + "MediumRectangle", PreFix + File.separator
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/..binmerge.crc b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/..binmerge.crc
new file mode 100644
index 0000000..891c286
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/..binmerge.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/.binmerge b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/.binmerge
new file mode 100755
index 0000000..365db6a
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/.binmerge
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/.part-00000.crc b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/.part-00000.crc
new file mode 100644
index 0000000..05bf27f
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/.part-00000.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/part-00000 b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/part-00000
new file mode 100755
index 0000000..a48ba91
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/data b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/data
new file mode 100644
index 0000000..83a9a62
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/data
@@ -0,0 +1,3 @@
+CAA	{[{AAC:[1]}]	[]	[]	[{ACA:[1]}]	{5':[], ~5':[]}		1.0x}
+ACA	{[{CAA:[1]}]	[]	[]	[{AAC:[1]}]	{5':[(1-0_0)], ~5':[]}		2.0x}
+AAC	{[{ACA:[1]}]	[]	[]	[{CAA:[1]}]	{5':[], ~5':[]}		1.0x}
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/graphviz/result.ps b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/graphviz/result.ps
new file mode 100644
index 0000000..073789d
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/graphviz/result.ps
@@ -0,0 +1,463 @@
+%!PS-Adobe-3.0
+%%Creator: graphviz version 2.26.3 (20100126.1600)
+%%Title: G
+%%Pages: (atend)
+%%BoundingBox: (atend)
+%%EndComments
+save
+%%BeginProlog
+/DotDict 200 dict def
+DotDict begin
+
+/setupLatin1 {
+mark
+/EncodingVector 256 array def
+ EncodingVector 0
+
+ISOLatin1Encoding 0 255 getinterval putinterval
+EncodingVector 45 /hyphen put
+
+% Set up ISO Latin 1 character encoding
+/starnetISO {
+        dup dup findfont dup length dict begin
+        { 1 index /FID ne { def }{ pop pop } ifelse
+        } forall
+        /Encoding EncodingVector def
+        currentdict end definefont
+} def
+/Times-Roman starnetISO def
+/Times-Italic starnetISO def
+/Times-Bold starnetISO def
+/Times-BoldItalic starnetISO def
+/Helvetica starnetISO def
+/Helvetica-Oblique starnetISO def
+/Helvetica-Bold starnetISO def
+/Helvetica-BoldOblique starnetISO def
+/Courier starnetISO def
+/Courier-Oblique starnetISO def
+/Courier-Bold starnetISO def
+/Courier-BoldOblique starnetISO def
+cleartomark
+} bind def
+
+%%BeginResource: procset graphviz 0 0
+/coord-font-family /Times-Roman def
+/default-font-family /Times-Roman def
+/coordfont coord-font-family findfont 8 scalefont def
+
+/InvScaleFactor 1.0 def
+/set_scale {
+       dup 1 exch div /InvScaleFactor exch def
+       scale
+} bind def
+
+% styles
+/solid { [] 0 setdash } bind def
+/dashed { [9 InvScaleFactor mul dup ] 0 setdash } bind def
+/dotted { [1 InvScaleFactor mul 6 InvScaleFactor mul] 0 setdash } bind def
+/invis {/fill {newpath} def /stroke {newpath} def /show {pop newpath} def} bind def
+/bold { 2 setlinewidth } bind def
+/filled { } bind def
+/unfilled { } bind def
+/rounded { } bind def
+/diagonals { } bind def
+
+% hooks for setting color 
+/nodecolor { sethsbcolor } bind def
+/edgecolor { sethsbcolor } bind def
+/graphcolor { sethsbcolor } bind def
+/nopcolor {pop pop pop} bind def
+
+/beginpage {	% i j npages
+	/npages exch def
+	/j exch def
+	/i exch def
+	/str 10 string def
+	npages 1 gt {
+		gsave
+			coordfont setfont
+			0 0 moveto
+			(\() show i str cvs show (,) show j str cvs show (\)) show
+		grestore
+	} if
+} bind def
+
+/set_font {
+	findfont exch
+	scalefont setfont
+} def
+
+% draw text fitted to its expected width
+/alignedtext {			% width text
+	/text exch def
+	/width exch def
+	gsave
+		width 0 gt {
+			[] 0 setdash
+			text stringwidth pop width exch sub text length div 0 text ashow
+		} if
+	grestore
+} def
+
+/boxprim {				% xcorner ycorner xsize ysize
+		4 2 roll
+		moveto
+		2 copy
+		exch 0 rlineto
+		0 exch rlineto
+		pop neg 0 rlineto
+		closepath
+} bind def
+
+/ellipse_path {
+	/ry exch def
+	/rx exch def
+	/y exch def
+	/x exch def
+	matrix currentmatrix
+	newpath
+	x y translate
+	rx ry scale
+	0 0 1 0 360 arc
+	setmatrix
+} bind def
+
+/endpage { showpage } bind def
+/showpage { } def
+
+/layercolorseq
+	[	% layer color sequence - darkest to lightest
+		[0 0 0]
+		[.2 .8 .8]
+		[.4 .8 .8]
+		[.6 .8 .8]
+		[.8 .8 .8]
+	]
+def
+
+/layerlen layercolorseq length def
+
+/setlayer {/maxlayer exch def /curlayer exch def
+	layercolorseq curlayer 1 sub layerlen mod get
+	aload pop sethsbcolor
+	/nodecolor {nopcolor} def
+	/edgecolor {nopcolor} def
+	/graphcolor {nopcolor} def
+} bind def
+
+/onlayer { curlayer ne {invis} if } def
+
+/onlayers {
+	/myupper exch def
+	/mylower exch def
+	curlayer mylower lt
+	curlayer myupper gt
+	or
+	{invis} if
+} def
+
+/curlayer 0 def
+
+%%EndResource
+%%EndProlog
+%%BeginSetup
+14 default-font-family set_font
+1 setmiterlimit
+% /arrowlength 10 def
+% /arrowwidth 5 def
+
+% make sure pdfmark is harmless for PS-interpreters other than Distiller
+/pdfmark where {pop} {userdict /pdfmark /cleartomark load put} ifelse
+% make '<<' and '>>' safe on PS Level 1 devices
+/languagelevel where {pop languagelevel}{1} ifelse
+2 lt {
+    userdict (<<) cvn ([) cvn load put
+    userdict (>>) cvn ([) cvn load put
+} if
+
+%%EndSetup
+setupLatin1
+%%Page: 1 1
+%%PageBoundingBox: 36 36 380 218
+%%PageOrientation: Portrait
+0 0 1 beginpage
+gsave
+36 36 344 182 boxprim clip newpath
+1 1 set_scale 0 rotate 40 41 translate
+% CAA
+gsave
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 0 moveto
+0 104 lineto
+56 104 lineto
+56 0 lineto
+closepath stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+12.5 87.4 moveto 31 (CAA) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 78 moveto
+56 78 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+14 61.4 moveto 28 (5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 52 moveto
+56 52 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+8 35.4 moveto 40 (~5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 26 moveto
+56 26 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+16.5 9.4 moveto 23 (1.0) alignedtext
+grestore
+% AAC
+gsave
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 70 moveto
+140 174 lineto
+196 174 lineto
+196 70 lineto
+closepath stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+152.5 157.4 moveto 31 (AAC) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 148 moveto
+196 148 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+154 131.4 moveto 28 (5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 122 moveto
+196 122 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+148 105.4 moveto 40 (~5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 96 moveto
+196 96 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+156.5 79.4 moveto 23 (1.0) alignedtext
+grestore
+% CAA->AAC
+gsave
+1 setlinewidth
+0 0 0 edgecolor
+newpath 56.09 79.53 moveto
+61.72 84.13 67.82 88.52 74 92 curveto
+91.22 101.69 112.01 108.74 129.66 113.56 curveto
+stroke
+0 0 0 edgecolor
+newpath 129.07 117.02 moveto
+139.63 116.14 lineto
+130.83 110.24 lineto
+closepath fill
+1 setlinewidth
+solid
+0 0 0 edgecolor
+newpath 129.07 117.02 moveto
+139.63 116.14 lineto
+130.83 110.24 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+75.5 116.4 moveto 45 (FF: [1]) alignedtext
+grestore
+% ACA
+gsave
+0 0 0.75294 nodecolor
+newpath 280 2 moveto
+280 106 lineto
+336 106 lineto
+336 2 lineto
+closepath fill
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 2 moveto
+280 106 lineto
+336 106 lineto
+336 2 lineto
+closepath stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+293 89.4 moveto 30 (ACA) alignedtext
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 80 moveto
+336 80 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+289.5 63.4 moveto 37 (5':[1]) alignedtext
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 54 moveto
+336 54 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+288 37.4 moveto 40 (~5':[]) alignedtext
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 28 moveto
+336 28 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+296.5 11.4 moveto 23 (2.0) alignedtext
+grestore
+% CAA->ACA
+gsave
+1 setlinewidth
+0 1 1 edgecolor
+newpath 56.23 48.72 moveto
+89.61 45.25 146.84 40.59 196 43 curveto
+220.65 44.21 248.25 46.9 269.88 49.32 curveto
+stroke
+0 1 1 edgecolor
+newpath 269.65 52.82 moveto
+279.98 50.48 lineto
+270.44 45.86 lineto
+closepath fill
+1 setlinewidth
+solid
+0 1 1 edgecolor
+newpath 269.65 52.82 moveto
+279.98 50.48 lineto
+270.44 45.86 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+144 48.4 moveto 48 (RR: [1]) alignedtext
+grestore
+% AAC->CAA
+gsave
+1 setlinewidth
+0 1 1 edgecolor
+newpath 139.97 84.05 moveto
+134.54 78.66 128.49 73.7 122 70 curveto
+105.22 60.44 84.31 55.85 66.47 53.69 curveto
+stroke
+0 1 1 edgecolor
+newpath 66.68 50.19 moveto
+56.38 52.68 lineto
+65.98 57.16 lineto
+closepath fill
+1 setlinewidth
+solid
+0 1 1 edgecolor
+newpath 66.68 50.19 moveto
+56.38 52.68 lineto
+65.98 57.16 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+74 75.4 moveto 48 (RR: [1]) alignedtext
+grestore
+% AAC->ACA
+gsave
+1 setlinewidth
+0 0 0 edgecolor
+newpath 196.05 117.81 moveto
+215.46 114.07 241.4 107.38 262 96 curveto
+265.42 94.11 268.78 91.92 272.05 89.55 curveto
+stroke
+0 0 0 edgecolor
+newpath 274.22 92.29 moveto
+279.92 83.36 lineto
+269.9 86.79 lineto
+closepath fill
+1 setlinewidth
+solid
+0 0 0 edgecolor
+newpath 274.22 92.29 moveto
+279.92 83.36 lineto
+269.9 86.79 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+215.5 118.4 moveto 45 (FF: [1]) alignedtext
+grestore
+% ACA->CAA
+gsave
+1 setlinewidth
+0 0 0 edgecolor
+newpath 279.99 42.97 moveto
+257.65 34.8 225.35 24.35 196 20 curveto
+171.38 16.35 164.65 16.55 140 20 curveto
+114.76 23.54 87.22 31.35 65.77 38.37 curveto
+stroke
+0 0 0 edgecolor
+newpath 64.44 35.12 moveto
+56.07 41.62 lineto
+66.66 41.76 lineto
+closepath fill
+1 setlinewidth
+solid
+0 0 0 edgecolor
+newpath 64.44 35.12 moveto
+56.07 41.62 lineto
+66.66 41.76 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+145.5 25.4 moveto 45 (FF: [1]) alignedtext
+grestore
+% ACA->AAC
+gsave
+1 setlinewidth
+0 1 1 edgecolor
+newpath 279.87 57.34 moveto
+260.42 60.54 234.47 66.64 214 78 curveto
+210.56 79.91 207.2 82.13 203.94 84.55 curveto
+stroke
+0 1 1 edgecolor
+newpath 201.69 81.87 moveto
+196.12 90.89 lineto
+206.09 87.31 lineto
+closepath fill
+1 setlinewidth
+solid
+0 1 1 edgecolor
+newpath 201.69 81.87 moveto
+196.12 90.89 lineto
+206.09 87.31 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+214 83.4 moveto 48 (RR: [1]) alignedtext
+grestore
+endpage
+showpage
+grestore
+%%PageTrailer
+%%EndPage: 1
+%%Trailer
+%%Pages: 1
+%%BoundingBox: 36 36 380 218
+end
+restore
+%%EOF
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index dd2f08c..e60c457 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -11,7 +11,6 @@
 import edu.uci.ics.genomix.type.EdgeWritable;
 import edu.uci.ics.genomix.type.KmerBytesWritable;
 import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
 public class VertexValueWritable 
     extends NodeWritable{
@@ -250,14 +249,14 @@
     }
     
     
-    /**
-     * Process any changes to value.  This is for edge updates.  nodeToAdd should be only edge
-     */
-    public void processUpdates(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
-    	// TODO remove this function (use updateEdges)
-        byte replaceDir = mirrorDirection(deleteDir);
-        this.getNode().updateEdges(deleteDir, toDelete, updateDir, replaceDir, other, true);
-    }
+//    /**
+//     * Process any changes to value.  This is for edge updates.  nodeToAdd should be only edge
+//     */
+//    public void processUpdates(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
+//    	// TODO remove this function (use updateEdges)
+//        byte replaceDir = mirrorDirection(deleteDir);
+//        this.getNode().updateEdges(deleteDir, toDelete, updateDir, replaceDir, other, true);
+//    }
     
     public void processFinalUpdates(byte deleteDir, byte updateDir, NodeWritable other){
         byte replaceDir = mirrorDirection(deleteDir);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 0d5448a..f29513a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -25,9 +25,11 @@
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 import edu.uci.ics.genomix.type.EdgeListWritable;
+import edu.uci.ics.genomix.type.NodeWritable.OutgoingListFlag;
 import edu.uci.ics.genomix.type.VKmerBytesWritable;
 import edu.uci.ics.genomix.type.VKmerListWritable;
 import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.NodeWritable.IncomingListFlag;
 
 public abstract class BasicGraphCleanVertex<V extends VertexValueWritable, M extends MessageWritable> extends
         Vertex<VKmerBytesWritable, V, NullWritable, M> {
@@ -510,6 +512,24 @@
         return true;
     }
     
+    
+    /**
+     * check if A need to be filpped with neighbor
+     */
+    public boolean ifFlipWithNeighbor(boolean withPrecessor){
+        if(withPrecessor){
+            if(getVertexValue().getRRList().isEmpty())
+                return true;
+            else
+                return false;
+        } else{
+            if(getVertexValue().getFFList().isEmpty())
+                return true;
+            else
+                return false;
+        }
+    }
+    
     /**
      * check if A need to be filpped with predecessor
      */
@@ -545,42 +565,19 @@
     }
     
     /**
-     * set adjMessage to predecessor(from successor)
+     * set neighborToMe Dir
      */
-    public void setPredecessorToMeDir(){
+    public void setNeighborToMeDir(boolean predecessorToMe){
+        if(getVertexValue().getDegree(predecessorToMe) != 1)
+            throw new IllegalArgumentException("In merge dir, the degree is not 1");
+        byte[] dirs = predecessorToMe ? IncomingListFlag.values : OutgoingListFlag.values;
         outFlag &= MessageFlag.DIR_CLEAR;
-        if(!getVertexValue().getRFList().isEmpty())
-            outFlag |= MessageFlag.DIR_RF;
-        else if(!getVertexValue().getRRList().isEmpty())
-            outFlag |= MessageFlag.DIR_RR;
-    }
-    
-    public void setPredecessorToMeDir(VKmerBytesWritable toFind){
-        outFlag &= MessageFlag.DIR_CLEAR;  // TODO WHAT HAPPENS IF THE NODE IS IN YOUR R AND YOUR RF?
-        if(getVertexValue().getRFList().contains(toFind))
-            outFlag |= MessageFlag.DIR_RF;
-        else if(getVertexValue().getRRList().contains(toFind))
-            outFlag |= MessageFlag.DIR_RR;
-    }
-    
-    /**
-     * set adjMessage to successor(from predecessor)
-     */
-    public void setSuccessorToMeDir(){
-        outFlag &= MessageFlag.DIR_CLEAR;
-        if(!getVertexValue().getFFList().isEmpty()) // TODO == 1 rather than isEmpty
-            outFlag |= MessageFlag.DIR_FF;
-        else if(!getVertexValue().getFRList().isEmpty())
-            outFlag |= MessageFlag.DIR_FR;
-        // TODO else exception
-    }
-    
-    public void setSuccessorToMeDir(VKmerBytesWritable toFind){ // TODO you should never have to FIND...
-        outFlag &= MessageFlag.DIR_CLEAR;
-        if(getVertexValue().getFFList().contains(toFind))
-            outFlag |= MessageFlag.DIR_FF;
-        else if(getVertexValue().getFRList().contains(toFind))
-            outFlag |= MessageFlag.DIR_FR;
+        
+        if(getVertexValue().getEdgeList(dirs[0]).getCountOfPosition() == 1){
+            outFlag |= dirs[0];
+        } else if(getVertexValue().getEdgeList(dirs[1]).getCountOfPosition() == 1){
+            outFlag |= dirs[1];
+        }
     }
     
     /**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 16b7ab3..f3d4d8f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -14,7 +14,25 @@
 
 public abstract class BasicPathMergeVertex<V extends VertexValueWritable, M extends PathMergeMessageWritable> extends
 	BasicGraphCleanVertex<V, M>{
-	
+    protected static final boolean isP1 = true;
+    protected static final boolean isP2 = false;
+    protected static final boolean isP4 = true;
+    
+    protected static final boolean toPredecessor = true;
+    protected static final boolean toSuccessor = false;
+    protected static final boolean mergeWithPrev = true;
+    protected static final boolean mergeWithNext = false;
+    protected static final boolean predecessorToMe = true;
+    protected static final boolean successorToMe = false;
+    
+    public void setStateAsMergeDir(boolean mergeWithPre){
+        short state = getVertexValue().getState();
+        state &= State.CAN_MERGE_CLEAR;
+        state |= mergeWithPre ? State.CAN_MERGEWITHPREV : State.CAN_MERGEWITHNEXT;
+        getVertexValue().setState(state);
+        activate();
+    }
+    
     public void setStateAsMergeWithPrev(){
         short state = getVertexValue().getState();
         state &= State.CAN_MERGE_CLEAR;
@@ -34,17 +52,17 @@
     /**
      * updateAdjList
      */
-    public void processUpdate(){
+    public void processUpdate(M msg){
     	// A -> B -> C with B merging with C
-        inFlag = incomingMsg.getFlag();
+        inFlag = msg.getFlag();
         byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);  // A -> B dir
         byte neighborToMeDir = mirrorDirection(meToNeighborDir);  // B -> A dir
         
         // TODO if you want, this logic could be figured out when sending the update from B
-        byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());  // A -> C after the merge
-         // TODO add C -> A dir and call node.updateEdges directly
-        getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
-                neighborToMergeDir, incomingMsg.getNode());
+        byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());  // A -> C after the merge
+        byte replaceDir = mirrorDirection(neighborToMeDir); // C -> A dir
+        getVertexValue().getNode().updateEdges(neighborToMeDir, msg.getSourceVertexId(), 
+                neighborToMergeDir, replaceDir, msg.getNode(), true);
     }
     
     /**
@@ -73,13 +91,6 @@
         getVertexValue().getEdgeList(neighborToMeDir).unionAdd(edge);
     }
     
-    /**
-     * merge and updateAdjList merge with one neighbor
-     */
-    public void processMerge(){ // TODO remove me
-        processMerge(incomingMsg);
-    }
-    
     public byte flipHeadMergeDir(byte d, boolean isFlip){
         if(isFlip){
             switch(d){
@@ -133,123 +144,52 @@
         getVertexValue().processMerges(neighborToMeDir, msg.getNode(), kmerSize);
     }
     
+    
     /**
-     * configure UPDATE msg   boolean: true == P4, false == P2
+     * send UPDATE msg   boolean: true == P4, false == P2
      */
-    public void configureUpdateMsgForPredecessor(boolean isP4){ 
+    public void sendUpdateMsg(boolean isP4, boolean toPredecessor){ 
         outgoingMsg.setSourceVertexId(getVertexId());
-        // TODO pass in isForward
-        for(byte d: OutgoingListFlag.values)
-            outgoingMsg.getNode().setEdgeList(d, getVertexValue().getEdgeList(d));  // TODO check
-        
         // TODO pass in the vertexId rather than isP4 (removes this block)
         if(isP4)
-            outgoingMsg.setFlip(ifFilpWithSuccessor());
+            outgoingMsg.setFlip(ifFlipWithNeighbor(!toPredecessor)); //ifFilpWithSuccessor()
         else 
             outgoingMsg.setFlip(ifFilpWithSuccessor(incomingMsg.getSourceVertexId()));
         
-        kmerIterator = getVertexValue().getRFList().getKeys();
-        while(kmerIterator.hasNext()){
-            outFlag &= MessageFlag.DIR_CLEAR;
-            outFlag |= MessageFlag.DIR_RF;
-            outgoingMsg.setFlag(outFlag);
-            destVertexId.setAsCopy(kmerIterator.next());
-            // TODO DON'T NEED TO SEARCH for this
-//            setPredecessorToMeDir(destVertexId); 
-            sendMsg(destVertexId, outgoingMsg);
-        }
-        kmerIterator = getVertexValue().getRRList().getKeys();
-        while(kmerIterator.hasNext()){
-            outFlag &= MessageFlag.DIR_CLEAR;
-            outFlag |= MessageFlag.DIR_RR;
-            outgoingMsg.setFlag(outFlag);
-            destVertexId.setAsCopy(kmerIterator.next());
-//            setPredecessorToMeDir(destVertexId);
-            sendMsg(destVertexId, outgoingMsg);
-        }
-    }
-    
-    public void configureUpdateMsgForSuccessor(boolean flag){
-        outgoingMsg.setSourceVertexId(getVertexId());
-        for(byte d: IncomingListFlag.values)
-            outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
+        byte[] mergeDirs = toPredecessor ? OutgoingListFlag.values : IncomingListFlag.values;
+        byte[] updateDirs = toPredecessor ? IncomingListFlag.values : OutgoingListFlag.values;
         
-        if(flag)
-            outgoingMsg.setFlip(ifFlipWithPredecessor()); 
-        else
-            outgoingMsg.setFlip(ifFlipWithPredecessor(incomingMsg.getSourceVertexId()));
+        for(byte dir : mergeDirs)
+            outgoingMsg.getNode().setEdgeList(dir, getVertexValue().getEdgeList(dir));
         
-        kmerIterator = getVertexValue().getFFList().getKeys();
-        while(kmerIterator.hasNext()){
-            outFlag &= MessageFlag.DIR_CLEAR;
-            outFlag |= MessageFlag.DIR_FF;
-            outgoingMsg.setFlag(outFlag);
-            destVertexId.setAsCopy(kmerIterator.next());
-//            setSuccessorToMeDir(destVertexId);
-            sendMsg(destVertexId, outgoingMsg);
-        }
-        kmerIterator = getVertexValue().getFRList().getKeys();
-        while(kmerIterator.hasNext()){
-            outFlag &= MessageFlag.DIR_CLEAR;
-            outFlag |= MessageFlag.DIR_FR;
-            outgoingMsg.setFlag(outFlag);
-            destVertexId.setAsCopy(kmerIterator.next());
-//            setSuccessorToMeDir(destVertexId);
-            sendMsg(destVertexId, outgoingMsg);
+        for(byte dir : updateDirs){
+            kmerIterator = getVertexValue().getEdgeList(dir).getKeys();
+            while(kmerIterator.hasNext()){
+                outFlag &= MessageFlag.DIR_CLEAR;
+                outFlag |= dir;
+                outgoingMsg.setFlag(outFlag);
+                destVertexId.setAsCopy(kmerIterator.next()); //TODO does destVertexId need deep copy?
+                sendMsg(destVertexId, outgoingMsg);
+            }
         }
     }
-    
-	/**
-     * send update message to neighber  boolean: true == P4, false == P2
-     */
-    public void broadcastUpdateMsg(boolean flag){
-//        if((getVertexValue().getState() & State.VERTEX_MASK) == State.IS_HEAD && (outFlag & State.VERTEX_MASK) != State.IS_FINAL)
-//            outFlag |= MessageFlag.IS_HEAD;
-        switch(getVertexValue().getState() & State.CAN_MERGE_MASK){
-            case State.CAN_MERGEWITHPREV:
-                /** confugure updateMsg for successor **/
-                configureUpdateMsgForSuccessor(flag);
-                break;
-            case State.CAN_MERGEWITHNEXT:
-                /** confugure updateMsg for predecessor **/
-                configureUpdateMsgForPredecessor(flag);
-                break; 
-        }
-    }
-    
 
     /**
-     * This vertex tries to merge with next vertex and send update msg to predecesspr
-     */
-    public void sendUpdateMsgToPredecessor(boolean flag){
-        if(getVertexValue().hasNextDest())  //TODO delete
-            broadcastUpdateMsg(flag);   
-    }
-    
-    /**
-     * This vertex tries to merge with next vertex and send update msg to successor
-     */
-    public void sendUpdateMsgToSuccessor(boolean flag){
-        if(getVertexValue().hasPrevDest())
-            broadcastUpdateMsg(flag);
-    }
-    
-    /**
      * override sendUpdateMsg and use incomingMsg as parameter automatically
      */
     public void sendUpdateMsg(){
-        sendUpdateMsg(incomingMsg);
+        sendUpdateMsgForP2(incomingMsg);
     }
     
     public void sendFinalUpdateMsg(){
         outFlag |= MessageFlag.IS_FINAL;
-        sendUpdateMsg(incomingMsg);
+        sendUpdateMsgForP2(incomingMsg);
     }
     
     /**
      * send update message to neighber for P2
      */
-    public void sendUpdateMsg(MessageWritable msg){
+    public void sendUpdateMsgForP2(MessageWritable msg){
         outgoingMsg.reset();
         outgoingMsg.setUpdateMsg(true);
         byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
@@ -257,11 +197,11 @@
         switch(neighborToMeDir){
             case MessageFlag.DIR_FF:
             case MessageFlag.DIR_FR:
-                sendUpdateMsgToPredecessor(false);
+                sendUpdateMsg(isP2, toPredecessor);
                 break;
             case MessageFlag.DIR_RF:
             case MessageFlag.DIR_RR: 
-                sendUpdateMsgToSuccessor(false);
+                sendUpdateMsg(isP2, toSuccessor);
                 break;
         }
     }
@@ -271,16 +211,16 @@
         outgoingMsg.setUpdateMsg(true);
         switch(getVertexValue().getState() & MessageFlag.HEAD_CAN_MERGE_MASK){
             case MessageFlag.HEAD_CAN_MERGEWITHPREV:
-                sendUpdateMsgToSuccessor(false);
+                sendUpdateMsg(isP2, toSuccessor);
                 break;
             case MessageFlag.HEAD_CAN_MERGEWITHNEXT:
-                sendUpdateMsgToPredecessor(false);
+                sendUpdateMsg(isP2, toPredecessor);
                 break;
         }
     }
     
     public void sendMergeMsgToSuccessor(){
-        setSuccessorToMeDir();
+        setNeighborToMeDir(successorToMe);
         if(ifFlipWithPredecessor())
             outgoingMsg.setFlip(true);
         else
@@ -324,13 +264,25 @@
     }
     
     /**
+     * send MERGE msg
+     */
+    public void sendMergeMsg(boolean toPredecessor, VKmerBytesWritable mergeDest){
+        setNeighborToMeDir(predecessorToMe);
+        outgoingMsg.setFlag(outFlag);
+        outgoingMsg.setSourceVertexId(getVertexId());
+//        for(byte d: OutgoingListFlag.values)
+//            outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
+        outgoingMsg.setNode(getVertexValue().getNode());
+        sendMsg(mergeDest, outgoingMsg);
+    }
+    
+    /**
      * configure MERGE msg  TODO: delete edgelist, merge configureMergeMsgForPredecessor and configureMergeMsgForPredecessorByIn...
      */
     public void configureMergeMsgForPredecessor(VKmerBytesWritable mergeDest){
-        setPredecessorToMeDir();
+        setNeighborToMeDir(predecessorToMe);
         outgoingMsg.setFlag(outFlag);
         outgoingMsg.setSourceVertexId(getVertexId());
-        outgoingMsg.setFlip(ifFilpWithSuccessor());
 //        for(byte d: OutgoingListFlag.values)
 //            outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
         outgoingMsg.setNode(getVertexValue().getNode());
@@ -338,52 +290,25 @@
     }
     
     public void configureMergeMsgForSuccessor(VKmerBytesWritable mergeDest){
-        setSuccessorToMeDir();
+        setNeighborToMeDir(successorToMe);
         outgoingMsg.setFlag(outFlag);
         outgoingMsg.setSourceVertexId(getVertexId());
-        outgoingMsg.setFlip(ifFlipWithPredecessor()); // TODO seems incorrect for outgoing... why predecessor?  //TODO REMOVE this flip boolean completely
 //        for(byte d: IncomingListFlag.values)
 //            outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
         outgoingMsg.setNode(getVertexValue().getNode());
         sendMsg(mergeDest, outgoingMsg);
     }
     
-//    /**
-//     * configure MERGE msg
-//     */
-//    public void configureMergeMsgForPredecessorByIncomingMsg(){
-//        setPredecessorToMeDir();
-//        outgoingMsg.setFlag(outFlag);        
-//        outgoingMsg.setFlip(ifFilpWithSuccessor());
-//        outgoingMsg.setSourceVertexId(getVertexId());
-//        for(byte d: OutgoingListFlag.values)
-//            outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
-//        outgoingMsg.setNode(getVertexValue().getNode());
-////        outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
-//        sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
-//    }
-//    
-//    public void configureMergeMsgForSuccessorByIncomingMsg(){
-//        setSuccessorToMeDir();
-//        outgoingMsg.setFlag(outFlag);
-//        outgoingMsg.setFlip(ifFlipWithPredecessor());
-//        outgoingMsg.setSourceVertexId(getVertexId());
-//        for(byte d: IncomingListFlag.values)
-//            outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
-//        outgoingMsg.setNode(getVertexValue().getNode());
-////        outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
-//        sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
-//    }
     /**
-     * send merge message to neighber for P4
+     * send merge message to neighber for P4, send message to the merge object and kill self
      */
-    public void broadcastMergeMsg(boolean deleteSelf){
+    public void broadcastMergeMsg(boolean isP4){
         outFlag |= getHeadMergeDir();
         switch(getVertexValue().getState() & State.CAN_MERGE_MASK) {
             case State.CAN_MERGEWITHNEXT:
                 // configure merge msg for successor
                 configureMergeMsgForSuccessor(getNextDestVertexId()); // TODO getDestVertexId(DIRECTION), then remove the switch statement, sendMergeMsg(DIRECTION)
-                if(deleteSelf)
+                if(isP4)
                     deleteVertex(getVertexId());
                 else{
                     getVertexValue().setState(State.IS_DEAD);
@@ -393,7 +318,7 @@
             case State.CAN_MERGEWITHPREV:
                 // configure merge msg for predecessor
                 configureMergeMsgForPredecessor(getPrevDestVertexId());
-                if(deleteSelf)
+                if(isP4)
                     deleteVertex(getVertexId());
                 else{
                     getVertexValue().setState(State.IS_DEAD);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 0683587..0e43c61 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -170,10 +170,10 @@
             byte headMergeDir = (byte)(getVertexValue().getState() & State.HEAD_CAN_MERGE_MASK);
             switch(headMergeDir){
                 case State.HEAD_CAN_MERGEWITHPREV:
-                    sendUpdateMsgToSuccessor(true);
+                    sendUpdateMsg(isP1, toSuccessor);
                     break;
                 case State.HEAD_CAN_MERGEWITHNEXT:
-                    sendUpdateMsgToPredecessor(true);
+                    sendUpdateMsg(isP1, toPredecessor);
                     break;
             }
         } else
@@ -186,7 +186,7 @@
     public void processUpdateOnceReceiveMsg(Iterator<PathMergeMessageWritable> msgIterator){
         while(msgIterator.hasNext()){
             incomingMsg = msgIterator.next();
-            processUpdate();
+            processUpdate(incomingMsg);
             if(isHaltNode())
                 voteToHalt();
             else
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index 865c1cc..0e67efc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -254,7 +254,7 @@
      * configure MERGE msg For P2
      */
     public void configureP2MergeMsgForPredecessor(VKmerBytesWritable mergeDest){
-        setPredecessorToMeDir();
+        setNeighborToMeDir(predecessorToMe);
         outgoingMsg.setFlag(outFlag);
         outgoingMsg.setSourceVertexId(getVertexId());
         outgoingMsg.setFlip(ifFilpWithSuccessor());
@@ -263,7 +263,7 @@
     }
     
     public void configureP2MergeMsgForSuccessor(VKmerBytesWritable mergeDest){
-        setSuccessorToMeDir();
+        setNeighborToMeDir(successorToMe);
         outgoingMsg.setFlag(outFlag);
         outgoingMsg.setSourceVertexId(getVertexId());
         outgoingMsg.setFlip(ifFlipWithPredecessor());
@@ -563,7 +563,7 @@
                         voteToHalt();
                     } else if(incomingMsg.isUpdateMsg() && (selfFlag == State.IS_OLDHEAD || isValidUpateNode())){// only old head update edges
                         if(!isHaltNode())
-                            processUpdate();
+                            processUpdate(incomingMsg);
                         voteToHalt();
                     } else if(isFinalMergeMsg()){// for final processing, receive msg from head, which means final merge (2) ex. 2, 8
                         sendFinalMergeMsg();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index c61e4b7..1e20fb6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -11,7 +11,6 @@
 import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
 import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
 import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
 import edu.uci.ics.genomix.type.NodeWritable.IncomingListFlag;
 import edu.uci.ics.genomix.type.NodeWritable.OutgoingListFlag;
 import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -23,7 +22,7 @@
  */
 public class P4ForPathMergeVertex extends
     BasicPathMergeVertex<VertexValueWritable, PathMergeMessageWritable> {
-
+    
     private static long randSeed = 1; //static for save memory
     private float probBeingRandomHead = -1;
     private Random randGenerator = null;
@@ -89,6 +88,9 @@
     protected boolean setPrevInfo(VertexValueWritable value) {
         if(getHeadMergeDir() == State.HEAD_CAN_MERGEWITHNEXT)
             return false;
+        if (isTandemRepeat(value)) {
+            return false;
+        }
         if (value.inDegree() == 1) {
             for(byte dir : IncomingListFlag.values){
                 if(value.getEdgeList(dir).getCountOfPosition() > 0){
@@ -107,6 +109,9 @@
     protected boolean setNextInfo(VertexValueWritable value) {
         if(getHeadMergeDir() == State.HEAD_CAN_MERGEWITHPREV)
             return false;
+        if (isTandemRepeat(value)) {
+            return false;
+        }
     	// TODO make sure the degree is correct
         if (value.outDegree() == 1) {
             for(byte dir : OutgoingListFlag.values){
@@ -119,7 +124,108 @@
         }
         return false;
     }
-
+    
+    /**
+     * step1 : sendUpdates
+     */
+    public void sendUpdates(){
+        //initiate merge_dir
+        setStateAsNoMerge();
+        
+        // only PATH vertices are present. Find the ID's for my neighbors
+        curKmer = getVertexId();
+        curHead = isNodeRandomHead(curKmer);
+        
+        // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
+        // We prevent merging towards non-path nodes
+        hasNext = setNextInfo(getVertexValue());  // TODO make this false if the node is restricted by its neighbors or by structure(when you combine steps 2 and 3) 
+        hasPrev = setPrevInfo(getVertexValue());
+        if (hasNext || hasPrev) {
+            if (curHead) {
+                if (hasNext && !nextHead) {
+                    // compress this head to the forward tail
+                    setStateAsMergeDir(mergeWithNext);
+                    sendUpdateMsg(isP4, toPredecessor);
+                } else if (hasPrev && !prevHead) {
+                    // compress this head to the reverse tail
+                    setStateAsMergeDir(mergeWithPrev);
+                    sendUpdateMsg(isP4, toSuccessor);
+                } 
+            }
+            else {
+                // I'm a tail
+                if (hasNext && hasPrev) {
+                     if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
+                        // tails on both sides, and I'm the "local minimum"
+                        // compress me towards the tail in forward dir
+                        setStateAsMergeDir(mergeWithNext);
+                        sendUpdateMsg(isP4, toPredecessor);
+                    }
+                } else if (!hasPrev) {
+                    // no previous node
+                    if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
+                        // merge towards tail in forward dir
+                        setStateAsMergeDir(mergeWithNext);
+                        sendUpdateMsg(isP4, toPredecessor);
+                    }
+                } else if (!hasNext) {
+                    // no next node
+                    if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
+                        // merge towards tail in reverse dir
+                        setStateAsMergeDir(mergeWithPrev);
+                        sendUpdateMsg(isP4, toSuccessor);
+                    }
+                }
+            }
+        }  // TODO else voteToHalt (when I combine steps 2 and 3)
+        this.activate();
+    }
+    
+    /**
+     * step2: receiveUpdates
+     */
+    public void receiveUpdates(Iterator<PathMergeMessageWritable> msgIterator){
+        //update neighber
+        while (msgIterator.hasNext()) {
+            incomingMsg = msgIterator.next();
+            processUpdate(incomingMsg);
+        }
+        if(isInactiveNode() || isHeadUnableToMerge()) // check structure and neighbor restriction 
+            voteToHalt();
+        else
+            activate();
+    }
+    
+    /**
+     * step4: processMerges 
+     */
+    public void receiveMerges(Iterator<PathMergeMessageWritable> msgIterator){
+        //merge tmpKmer
+        while (msgIterator.hasNext()) {
+            boolean selfFlag = (getHeadMergeDir() == State.HEAD_CAN_MERGEWITHPREV || getHeadMergeDir() == State.HEAD_CAN_MERGEWITHNEXT);
+            incomingMsg = msgIterator.next();
+            /** process merge **/
+            processMerge(incomingMsg);
+            // set statistics counter: Num_MergedNodes
+            updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
+            /** if it's a tandem repeat, which means detecting cycle **/
+            if(isTandemRepeat(getVertexValue())){
+                // set statistics counter: Num_Cycles
+                updateStatisticsCounter(StatisticsCounter.Num_Cycles); 
+                voteToHalt();  // TODO make sure you're checking structure to preclude tandem repeats
+            }/** head meets head, stop **/ 
+            else if(!VertexUtil.isCanMergeVertex(getVertexValue()) || isHeadMeetsHead(selfFlag)){
+                getVertexValue().setState(State.HEAD_CANNOT_MERGE);
+                // set statistics counter: Num_MergedPaths
+                updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
+                voteToHalt();
+            }else{
+                activate();
+            }
+            getVertexValue().setCounters(counters);
+        }
+    }
+    
     @Override
     public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
         initVertex();
@@ -127,111 +233,14 @@
             startSendMsg();
         else if (getSuperstep() == 2)
             initState(msgIterator);
-        else if (getSuperstep() % 4 == 3){ 
-        	// TODO separate function for this block
-            //initiate merge_dir
-            setStateAsNoMerge();
-            
-            // only PATH vertices are present. Find the ID's for my neighbors
-            curKmer = getVertexId();
-            curHead = isNodeRandomHead(curKmer);
-            
-            // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
-            // We prevent merging towards non-path nodes
-            hasNext = setNextInfo(getVertexValue());  // TODO make this false if the node is restricted by its neighbors or by structure 
-            hasPrev = setPrevInfo(getVertexValue());
-            if (isTandemRepeat(getVertexValue())) {
-                hasNext = false;
-                hasPrev = false;
-            }
-            if (hasNext || hasPrev) {
-                if (curHead) {
-                    if (hasNext && !nextHead) {
-                        // compress this head to the forward tail
-                        setStateAsMergeWithNext();
-//                        configureUpdateMsg(isNextOrPrevious?, getVertexValue()); // TODO change to sendmsg or sendUpdateFrom...
-                		sendUpdateMsgToPredecessor(true); //TODO all of these can be simplified
-                    } else if (hasPrev && !prevHead) {
-                        // compress this head to the reverse tail
-                        setStateAsMergeWithPrev();
-                        sendUpdateMsgToSuccessor(true);
-                    } 
-                }
-                else {
-                    // I'm a tail
-                    if (hasNext && hasPrev) {
-                         if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
-                            // tails on both sides, and I'm the "local minimum"
-                            // compress me towards the tail in forward dir
-                            setStateAsMergeWithNext();
-                            sendUpdateMsgToPredecessor(true);
-                        }
-                    } else if (!hasPrev) {
-                        // no previous node
-                        if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
-                            // merge towards tail in forward dir
-                            setStateAsMergeWithNext();
-                            sendUpdateMsgToPredecessor(true);
-                        }
-                    } else if (!hasNext) {
-                        // no next node
-                        if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
-                            // merge towards tail in reverse dir
-                            setStateAsMergeWithPrev();
-                            sendUpdateMsgToSuccessor(true);
-                        }
-                    }
-                }
-            }  // TODO else voteToHalt (when you combine steps 2 and 3)
-            this.activate();
-        }
-        else if (getSuperstep() % 4 == 0){  
-        	// TODO separate function for this step
-            //update neighber
-            while (msgIterator.hasNext()) {
-                incomingMsg = msgIterator.next();
-                processUpdate();  // TODO pass incomingMsg as a parameter
-                
-                // TODO move outside the loop
-                if(isInactiveNode() || isHeadUnableToMerge())  // check structure and neighbor restriction 
-                    voteToHalt();
-                else
-                    activate();
-            }
-        } else if (getSuperstep() % 4 == 1){
-            //send message to the merge object and kill self
+        else if (getSuperstep() % 4 == 3)
+            sendUpdates();
+        else if (getSuperstep() % 4 == 0)  
+            receiveUpdates(msgIterator);
+        else if (getSuperstep() % 4 == 1){
             broadcastMergeMsg(true);
-        } else if (getSuperstep() % 4 == 2){
-            //merge tmpKmer
-            while (msgIterator.hasNext()) {
-                incomingMsg = msgIterator.next();
-                boolean selfFlag = (getHeadMergeDir() == State.HEAD_CAN_MERGEWITHPREV || getHeadMergeDir() == State.HEAD_CAN_MERGEWITHNEXT);
-                /** process merge **/
-                processMerge(); // TODO use incomingMsg as a parameter
-                // set statistics counter: Num_MergedNodes
-                updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
-                /** if it's a tandem repeat, which means detecting cycle **/
-                if(isTandemRepeat(getVertexValue())){  // TODO check 3 node cycle to make sure the update is cocrect (try several times) 
-                    for(byte d : DirectionFlag.values)
-                        getVertexValue().getEdgeList(d).reset(); // TODO don't remove tandem repeats but DO stop merging  // we shouldn't need to update neighbors 
-                    // set statistics counter: Num_TandemRepeats
-                    updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats); // TODO cycle instead of tandem repeat
-                    getVertexValue().setCounters(counters);
-                    voteToHalt();  // TODO make sure you're checking structure to preclude tandem repeats
-                }/** head meets head, stop **/ 
-                else if(!VertexUtil.isCanMergeVertex(getVertexValue()) || isHeadMeetsHead(selfFlag)){
-                    getVertexValue().setState(State.HEAD_CANNOT_MERGE);
-                    // set statistics counter: Num_MergedPaths
-                    updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
-                    getVertexValue().setCounters(counters);
-                    voteToHalt();
-                }
-                else{
-                    getVertexValue().setCounters(counters); // TODO move all the setCounter calls outside the if/else blocks
-                    activate();
-                }
-            }
-        }
+        } else if (getSuperstep() % 4 == 2)
+            receiveMerges(msgIterator);
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
index 98f82d3..ee2a683 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
@@ -10,6 +10,7 @@
     public static final byte Num_RemovedBridges = 0b0110 << 0;
     public static final byte Num_SplitRepeats = 0b0111 << 0;
     public static final byte Num_Scaffodings = 0b1000 << 0;
+    public static final byte Num_Cycles = 0b1001 << 0;
     
     public final static class COUNTER_CONTENT{
         public static String getContent(byte code){
@@ -42,6 +43,9 @@
                 case Num_Scaffodings:
                     r = "num of scaffoldings";
                     break;
+                case Num_Cycles:
+                    r = "num of cycles";
+                    break;
             }
             return r;
         }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
index 13f264a..71444a6 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
@@ -6,22 +6,22 @@
 
     public static Test suite() throws Exception {
         String pattern ="PathMerge"; 
-        String testSet[] = {
-                "2", "3", "4", "5", "6", "7", "8", "9", "head_6", "head_7",
-                "P2_3", "P2_4", "P2_5", "P2_6", "P2_7", "P2_8",
-                "LeftAdj", "RightAdj",
-                "FR", "RF", "head_FR", "head_RF", "twohead_FR", "twohead_RF",
-                "SelfTandemRepeat", "TandemRepeatWithMergeEdge", 
-                "TandemRepeatWithUnmergeEdge", "ComplexTandemRepeat",
-                "SimplePath", "ThreeDuplicate",
-                "SimpleBridgePath", "BridgePathWithTandemRepeat",
-                "RingPath", "CyclePath",
-                "SimpleTreePath", "ComplexTreePath",
-                "Triangle", "Rectangle", 
-                "synthetic",
-                "MultiTandemRepeat", "MultiTandemRepeat2", "MultiTandemRepeat3",
-                "TandemRepeatWithSmallCycle", "TandemRepeatAndCycle",
-                "sameWithEdge", "FR_RF_Simple"
+        String testSet[] = {"ThreeNodesCycle", "RingPath", "SimpleTreePath", "RingPath", "CyclePath", "9", "P2_8", "ComplexTandemRepeat"
+//                "2", "3", "4", "5", "6", "7", "8", "9", "head_6", "head_7",
+//                "P2_3", "P2_4", "P2_5", "P2_6", "P2_7", "P2_8",
+//                "LeftAdj", "RightAdj",
+//                "FR", "RF", "head_FR", "head_RF", "twohead_FR", "twohead_RF",
+//                "SelfTandemRepeat", "TandemRepeatWithMergeEdge", 
+//                "TandemRepeatWithUnmergeEdge", "ComplexTandemRepeat",
+//                "SimplePath", "ThreeDuplicate",
+//                "SimpleBridgePath", "BridgePathWithTandemRepeat",
+//                "RingPath", "CyclePath",
+//                "SimpleTreePath", "ComplexTreePath",
+//                "Triangle", "Rectangle", 
+//                "synthetic",
+//                "MultiTandemRepeat", "MultiTandemRepeat2", "MultiTandemRepeat3",
+//                "TandemRepeatWithSmallCycle", "TandemRepeatAndCycle",
+//                "sameWithEdge", "FR_RF_Simple"
 //                "SmallGenome_5",
 
         };