Merge branch 'wbiesing/debug' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
index fbcceaa..0582e08 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
@@ -30,7 +30,6 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.data.Marshal;
-import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
public class EdgeListWritable implements WritableComparable<EdgeListWritable>, Serializable, Iterable<EdgeWritable>{
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 700b3b5..e82c37d 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -559,7 +559,10 @@
public int outDegree() {
return edges[DirectionFlag.DIR_FF].getCountOfPosition() + edges[DirectionFlag.DIR_FR].getCountOfPosition();
}
-
+
+ public int getDegree(boolean prev){
+ return prev ? inDegree() : outDegree();
+ }
/*
* Return if this node is a "path" compressible node, that is, it has an
* in-degree and out-degree of 1
diff --git a/genomix/genomix-hadoop/data/webmap/PathMerge_TestSet/ThreeNodesCycle/3 b/genomix/genomix-hadoop/data/webmap/PathMerge_TestSet/ThreeNodesCycle/3
new file mode 100644
index 0000000..0991038
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/PathMerge_TestSet/ThreeNodesCycle/3
@@ -0,0 +1 @@
+1 ACAACA
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java
index 988a88b..5b76446 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTestSuite.java
@@ -32,7 +32,7 @@
// + "P2_8"
// + "SmallCycle"
// + "sameWithEdge"
- + "FR_RF_Simple"
+ + "ThreeNodesCycle"
};
// + "SimpleRectangle", PreFix + File.separator
// + "MediumRectangle", PreFix + File.separator
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/..binmerge.crc b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/..binmerge.crc
new file mode 100644
index 0000000..891c286
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/..binmerge.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/.binmerge b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/.binmerge
new file mode 100755
index 0000000..365db6a
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/.binmerge
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/.part-00000.crc b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/.part-00000.crc
new file mode 100644
index 0000000..05bf27f
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/.part-00000.crc
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/part-00000 b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/part-00000
new file mode 100755
index 0000000..a48ba91
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/bin/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/data b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/data
new file mode 100644
index 0000000..83a9a62
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/data
@@ -0,0 +1,3 @@
+CAA {[{AAC:[1]}] [] [] [{ACA:[1]}] {5':[], ~5':[]} 1.0x}
+ACA {[{CAA:[1]}] [] [] [{AAC:[1]}] {5':[(1-0_0)], ~5':[]} 2.0x}
+AAC {[{ACA:[1]}] [] [] [{CAA:[1]}] {5':[], ~5':[]} 1.0x}
diff --git a/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/graphviz/result.ps b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/graphviz/result.ps
new file mode 100644
index 0000000..073789d
--- /dev/null
+++ b/genomix/genomix-pregelix/data/TestSet/PathMerge/ThreeNodesCycle/graphviz/result.ps
@@ -0,0 +1,463 @@
+%!PS-Adobe-3.0
+%%Creator: graphviz version 2.26.3 (20100126.1600)
+%%Title: G
+%%Pages: (atend)
+%%BoundingBox: (atend)
+%%EndComments
+save
+%%BeginProlog
+/DotDict 200 dict def
+DotDict begin
+
+/setupLatin1 {
+mark
+/EncodingVector 256 array def
+ EncodingVector 0
+
+ISOLatin1Encoding 0 255 getinterval putinterval
+EncodingVector 45 /hyphen put
+
+% Set up ISO Latin 1 character encoding
+/starnetISO {
+ dup dup findfont dup length dict begin
+ { 1 index /FID ne { def }{ pop pop } ifelse
+ } forall
+ /Encoding EncodingVector def
+ currentdict end definefont
+} def
+/Times-Roman starnetISO def
+/Times-Italic starnetISO def
+/Times-Bold starnetISO def
+/Times-BoldItalic starnetISO def
+/Helvetica starnetISO def
+/Helvetica-Oblique starnetISO def
+/Helvetica-Bold starnetISO def
+/Helvetica-BoldOblique starnetISO def
+/Courier starnetISO def
+/Courier-Oblique starnetISO def
+/Courier-Bold starnetISO def
+/Courier-BoldOblique starnetISO def
+cleartomark
+} bind def
+
+%%BeginResource: procset graphviz 0 0
+/coord-font-family /Times-Roman def
+/default-font-family /Times-Roman def
+/coordfont coord-font-family findfont 8 scalefont def
+
+/InvScaleFactor 1.0 def
+/set_scale {
+ dup 1 exch div /InvScaleFactor exch def
+ scale
+} bind def
+
+% styles
+/solid { [] 0 setdash } bind def
+/dashed { [9 InvScaleFactor mul dup ] 0 setdash } bind def
+/dotted { [1 InvScaleFactor mul 6 InvScaleFactor mul] 0 setdash } bind def
+/invis {/fill {newpath} def /stroke {newpath} def /show {pop newpath} def} bind def
+/bold { 2 setlinewidth } bind def
+/filled { } bind def
+/unfilled { } bind def
+/rounded { } bind def
+/diagonals { } bind def
+
+% hooks for setting color
+/nodecolor { sethsbcolor } bind def
+/edgecolor { sethsbcolor } bind def
+/graphcolor { sethsbcolor } bind def
+/nopcolor {pop pop pop} bind def
+
+/beginpage { % i j npages
+ /npages exch def
+ /j exch def
+ /i exch def
+ /str 10 string def
+ npages 1 gt {
+ gsave
+ coordfont setfont
+ 0 0 moveto
+ (\() show i str cvs show (,) show j str cvs show (\)) show
+ grestore
+ } if
+} bind def
+
+/set_font {
+ findfont exch
+ scalefont setfont
+} def
+
+% draw text fitted to its expected width
+/alignedtext { % width text
+ /text exch def
+ /width exch def
+ gsave
+ width 0 gt {
+ [] 0 setdash
+ text stringwidth pop width exch sub text length div 0 text ashow
+ } if
+ grestore
+} def
+
+/boxprim { % xcorner ycorner xsize ysize
+ 4 2 roll
+ moveto
+ 2 copy
+ exch 0 rlineto
+ 0 exch rlineto
+ pop neg 0 rlineto
+ closepath
+} bind def
+
+/ellipse_path {
+ /ry exch def
+ /rx exch def
+ /y exch def
+ /x exch def
+ matrix currentmatrix
+ newpath
+ x y translate
+ rx ry scale
+ 0 0 1 0 360 arc
+ setmatrix
+} bind def
+
+/endpage { showpage } bind def
+/showpage { } def
+
+/layercolorseq
+ [ % layer color sequence - darkest to lightest
+ [0 0 0]
+ [.2 .8 .8]
+ [.4 .8 .8]
+ [.6 .8 .8]
+ [.8 .8 .8]
+ ]
+def
+
+/layerlen layercolorseq length def
+
+/setlayer {/maxlayer exch def /curlayer exch def
+ layercolorseq curlayer 1 sub layerlen mod get
+ aload pop sethsbcolor
+ /nodecolor {nopcolor} def
+ /edgecolor {nopcolor} def
+ /graphcolor {nopcolor} def
+} bind def
+
+/onlayer { curlayer ne {invis} if } def
+
+/onlayers {
+ /myupper exch def
+ /mylower exch def
+ curlayer mylower lt
+ curlayer myupper gt
+ or
+ {invis} if
+} def
+
+/curlayer 0 def
+
+%%EndResource
+%%EndProlog
+%%BeginSetup
+14 default-font-family set_font
+1 setmiterlimit
+% /arrowlength 10 def
+% /arrowwidth 5 def
+
+% make sure pdfmark is harmless for PS-interpreters other than Distiller
+/pdfmark where {pop} {userdict /pdfmark /cleartomark load put} ifelse
+% make '<<' and '>>' safe on PS Level 1 devices
+/languagelevel where {pop languagelevel}{1} ifelse
+2 lt {
+ userdict (<<) cvn ([) cvn load put
+ userdict (>>) cvn ([) cvn load put
+} if
+
+%%EndSetup
+setupLatin1
+%%Page: 1 1
+%%PageBoundingBox: 36 36 380 218
+%%PageOrientation: Portrait
+0 0 1 beginpage
+gsave
+36 36 344 182 boxprim clip newpath
+1 1 set_scale 0 rotate 40 41 translate
+% CAA
+gsave
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 0 moveto
+0 104 lineto
+56 104 lineto
+56 0 lineto
+closepath stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+12.5 87.4 moveto 31 (CAA) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 78 moveto
+56 78 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+14 61.4 moveto 28 (5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 52 moveto
+56 52 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+8 35.4 moveto 40 (~5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 0 26 moveto
+56 26 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+16.5 9.4 moveto 23 (1.0) alignedtext
+grestore
+% AAC
+gsave
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 70 moveto
+140 174 lineto
+196 174 lineto
+196 70 lineto
+closepath stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+152.5 157.4 moveto 31 (AAC) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 148 moveto
+196 148 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+154 131.4 moveto 28 (5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 122 moveto
+196 122 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+148 105.4 moveto 40 (~5':[]) alignedtext
+1 setlinewidth
+0 0 0 nodecolor
+newpath 140 96 moveto
+196 96 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+156.5 79.4 moveto 23 (1.0) alignedtext
+grestore
+% CAA->AAC
+gsave
+1 setlinewidth
+0 0 0 edgecolor
+newpath 56.09 79.53 moveto
+61.72 84.13 67.82 88.52 74 92 curveto
+91.22 101.69 112.01 108.74 129.66 113.56 curveto
+stroke
+0 0 0 edgecolor
+newpath 129.07 117.02 moveto
+139.63 116.14 lineto
+130.83 110.24 lineto
+closepath fill
+1 setlinewidth
+solid
+0 0 0 edgecolor
+newpath 129.07 117.02 moveto
+139.63 116.14 lineto
+130.83 110.24 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+75.5 116.4 moveto 45 (FF: [1]) alignedtext
+grestore
+% ACA
+gsave
+0 0 0.75294 nodecolor
+newpath 280 2 moveto
+280 106 lineto
+336 106 lineto
+336 2 lineto
+closepath fill
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 2 moveto
+280 106 lineto
+336 106 lineto
+336 2 lineto
+closepath stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+293 89.4 moveto 30 (ACA) alignedtext
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 80 moveto
+336 80 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+289.5 63.4 moveto 37 (5':[1]) alignedtext
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 54 moveto
+336 54 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+288 37.4 moveto 40 (~5':[]) alignedtext
+1 setlinewidth
+filled
+0 0 0 nodecolor
+newpath 280 28 moveto
+336 28 lineto
+stroke
+0 0 0 nodecolor
+14 /Times-Roman set_font
+296.5 11.4 moveto 23 (2.0) alignedtext
+grestore
+% CAA->ACA
+gsave
+1 setlinewidth
+0 1 1 edgecolor
+newpath 56.23 48.72 moveto
+89.61 45.25 146.84 40.59 196 43 curveto
+220.65 44.21 248.25 46.9 269.88 49.32 curveto
+stroke
+0 1 1 edgecolor
+newpath 269.65 52.82 moveto
+279.98 50.48 lineto
+270.44 45.86 lineto
+closepath fill
+1 setlinewidth
+solid
+0 1 1 edgecolor
+newpath 269.65 52.82 moveto
+279.98 50.48 lineto
+270.44 45.86 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+144 48.4 moveto 48 (RR: [1]) alignedtext
+grestore
+% AAC->CAA
+gsave
+1 setlinewidth
+0 1 1 edgecolor
+newpath 139.97 84.05 moveto
+134.54 78.66 128.49 73.7 122 70 curveto
+105.22 60.44 84.31 55.85 66.47 53.69 curveto
+stroke
+0 1 1 edgecolor
+newpath 66.68 50.19 moveto
+56.38 52.68 lineto
+65.98 57.16 lineto
+closepath fill
+1 setlinewidth
+solid
+0 1 1 edgecolor
+newpath 66.68 50.19 moveto
+56.38 52.68 lineto
+65.98 57.16 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+74 75.4 moveto 48 (RR: [1]) alignedtext
+grestore
+% AAC->ACA
+gsave
+1 setlinewidth
+0 0 0 edgecolor
+newpath 196.05 117.81 moveto
+215.46 114.07 241.4 107.38 262 96 curveto
+265.42 94.11 268.78 91.92 272.05 89.55 curveto
+stroke
+0 0 0 edgecolor
+newpath 274.22 92.29 moveto
+279.92 83.36 lineto
+269.9 86.79 lineto
+closepath fill
+1 setlinewidth
+solid
+0 0 0 edgecolor
+newpath 274.22 92.29 moveto
+279.92 83.36 lineto
+269.9 86.79 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+215.5 118.4 moveto 45 (FF: [1]) alignedtext
+grestore
+% ACA->CAA
+gsave
+1 setlinewidth
+0 0 0 edgecolor
+newpath 279.99 42.97 moveto
+257.65 34.8 225.35 24.35 196 20 curveto
+171.38 16.35 164.65 16.55 140 20 curveto
+114.76 23.54 87.22 31.35 65.77 38.37 curveto
+stroke
+0 0 0 edgecolor
+newpath 64.44 35.12 moveto
+56.07 41.62 lineto
+66.66 41.76 lineto
+closepath fill
+1 setlinewidth
+solid
+0 0 0 edgecolor
+newpath 64.44 35.12 moveto
+56.07 41.62 lineto
+66.66 41.76 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+145.5 25.4 moveto 45 (FF: [1]) alignedtext
+grestore
+% ACA->AAC
+gsave
+1 setlinewidth
+0 1 1 edgecolor
+newpath 279.87 57.34 moveto
+260.42 60.54 234.47 66.64 214 78 curveto
+210.56 79.91 207.2 82.13 203.94 84.55 curveto
+stroke
+0 1 1 edgecolor
+newpath 201.69 81.87 moveto
+196.12 90.89 lineto
+206.09 87.31 lineto
+closepath fill
+1 setlinewidth
+solid
+0 1 1 edgecolor
+newpath 201.69 81.87 moveto
+196.12 90.89 lineto
+206.09 87.31 lineto
+closepath stroke
+0 0 0 edgecolor
+14 /Times-Roman set_font
+214 83.4 moveto 48 (RR: [1]) alignedtext
+grestore
+endpage
+showpage
+grestore
+%%PageTrailer
+%%EndPage: 1
+%%Trailer
+%%Pages: 1
+%%BoundingBox: 36 36 380 218
+end
+restore
+%%EOF
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index dd2f08c..e60c457 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -11,7 +11,6 @@
import edu.uci.ics.genomix.type.EdgeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class VertexValueWritable
extends NodeWritable{
@@ -250,14 +249,14 @@
}
- /**
- * Process any changes to value. This is for edge updates. nodeToAdd should be only edge
- */
- public void processUpdates(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
- // TODO remove this function (use updateEdges)
- byte replaceDir = mirrorDirection(deleteDir);
- this.getNode().updateEdges(deleteDir, toDelete, updateDir, replaceDir, other, true);
- }
+// /**
+// * Process any changes to value. This is for edge updates. nodeToAdd should be only edge
+// */
+// public void processUpdates(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
+// // TODO remove this function (use updateEdges)
+// byte replaceDir = mirrorDirection(deleteDir);
+// this.getNode().updateEdges(deleteDir, toDelete, updateDir, replaceDir, other, true);
+// }
public void processFinalUpdates(byte deleteDir, byte updateDir, NodeWritable other){
byte replaceDir = mirrorDirection(deleteDir);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 0d5448a..f29513a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -25,9 +25,11 @@
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.EdgeListWritable;
+import edu.uci.ics.genomix.type.NodeWritable.OutgoingListFlag;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.NodeWritable.IncomingListFlag;
public abstract class BasicGraphCleanVertex<V extends VertexValueWritable, M extends MessageWritable> extends
Vertex<VKmerBytesWritable, V, NullWritable, M> {
@@ -510,6 +512,24 @@
return true;
}
+
+ /**
+ * check if A need to be filpped with neighbor
+ */
+ public boolean ifFlipWithNeighbor(boolean withPrecessor){
+ if(withPrecessor){
+ if(getVertexValue().getRRList().isEmpty())
+ return true;
+ else
+ return false;
+ } else{
+ if(getVertexValue().getFFList().isEmpty())
+ return true;
+ else
+ return false;
+ }
+ }
+
/**
* check if A need to be filpped with predecessor
*/
@@ -545,42 +565,19 @@
}
/**
- * set adjMessage to predecessor(from successor)
+ * set neighborToMe Dir
*/
- public void setPredecessorToMeDir(){
+ public void setNeighborToMeDir(boolean predecessorToMe){
+ if(getVertexValue().getDegree(predecessorToMe) != 1)
+ throw new IllegalArgumentException("In merge dir, the degree is not 1");
+ byte[] dirs = predecessorToMe ? IncomingListFlag.values : OutgoingListFlag.values;
outFlag &= MessageFlag.DIR_CLEAR;
- if(!getVertexValue().getRFList().isEmpty())
- outFlag |= MessageFlag.DIR_RF;
- else if(!getVertexValue().getRRList().isEmpty())
- outFlag |= MessageFlag.DIR_RR;
- }
-
- public void setPredecessorToMeDir(VKmerBytesWritable toFind){
- outFlag &= MessageFlag.DIR_CLEAR; // TODO WHAT HAPPENS IF THE NODE IS IN YOUR R AND YOUR RF?
- if(getVertexValue().getRFList().contains(toFind))
- outFlag |= MessageFlag.DIR_RF;
- else if(getVertexValue().getRRList().contains(toFind))
- outFlag |= MessageFlag.DIR_RR;
- }
-
- /**
- * set adjMessage to successor(from predecessor)
- */
- public void setSuccessorToMeDir(){
- outFlag &= MessageFlag.DIR_CLEAR;
- if(!getVertexValue().getFFList().isEmpty()) // TODO == 1 rather than isEmpty
- outFlag |= MessageFlag.DIR_FF;
- else if(!getVertexValue().getFRList().isEmpty())
- outFlag |= MessageFlag.DIR_FR;
- // TODO else exception
- }
-
- public void setSuccessorToMeDir(VKmerBytesWritable toFind){ // TODO you should never have to FIND...
- outFlag &= MessageFlag.DIR_CLEAR;
- if(getVertexValue().getFFList().contains(toFind))
- outFlag |= MessageFlag.DIR_FF;
- else if(getVertexValue().getFRList().contains(toFind))
- outFlag |= MessageFlag.DIR_FR;
+
+ if(getVertexValue().getEdgeList(dirs[0]).getCountOfPosition() == 1){
+ outFlag |= dirs[0];
+ } else if(getVertexValue().getEdgeList(dirs[1]).getCountOfPosition() == 1){
+ outFlag |= dirs[1];
+ }
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 16b7ab3..f3d4d8f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -14,7 +14,25 @@
public abstract class BasicPathMergeVertex<V extends VertexValueWritable, M extends PathMergeMessageWritable> extends
BasicGraphCleanVertex<V, M>{
-
+ protected static final boolean isP1 = true;
+ protected static final boolean isP2 = false;
+ protected static final boolean isP4 = true;
+
+ protected static final boolean toPredecessor = true;
+ protected static final boolean toSuccessor = false;
+ protected static final boolean mergeWithPrev = true;
+ protected static final boolean mergeWithNext = false;
+ protected static final boolean predecessorToMe = true;
+ protected static final boolean successorToMe = false;
+
+ public void setStateAsMergeDir(boolean mergeWithPre){
+ short state = getVertexValue().getState();
+ state &= State.CAN_MERGE_CLEAR;
+ state |= mergeWithPre ? State.CAN_MERGEWITHPREV : State.CAN_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ activate();
+ }
+
public void setStateAsMergeWithPrev(){
short state = getVertexValue().getState();
state &= State.CAN_MERGE_CLEAR;
@@ -34,17 +52,17 @@
/**
* updateAdjList
*/
- public void processUpdate(){
+ public void processUpdate(M msg){
// A -> B -> C with B merging with C
- inFlag = incomingMsg.getFlag();
+ inFlag = msg.getFlag();
byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK); // A -> B dir
byte neighborToMeDir = mirrorDirection(meToNeighborDir); // B -> A dir
// TODO if you want, this logic could be figured out when sending the update from B
- byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip()); // A -> C after the merge
- // TODO add C -> A dir and call node.updateEdges directly
- getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, incomingMsg.getNode());
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip()); // A -> C after the merge
+ byte replaceDir = mirrorDirection(neighborToMeDir); // C -> A dir
+ getVertexValue().getNode().updateEdges(neighborToMeDir, msg.getSourceVertexId(),
+ neighborToMergeDir, replaceDir, msg.getNode(), true);
}
/**
@@ -73,13 +91,6 @@
getVertexValue().getEdgeList(neighborToMeDir).unionAdd(edge);
}
- /**
- * merge and updateAdjList merge with one neighbor
- */
- public void processMerge(){ // TODO remove me
- processMerge(incomingMsg);
- }
-
public byte flipHeadMergeDir(byte d, boolean isFlip){
if(isFlip){
switch(d){
@@ -133,123 +144,52 @@
getVertexValue().processMerges(neighborToMeDir, msg.getNode(), kmerSize);
}
+
/**
- * configure UPDATE msg boolean: true == P4, false == P2
+ * send UPDATE msg boolean: true == P4, false == P2
*/
- public void configureUpdateMsgForPredecessor(boolean isP4){
+ public void sendUpdateMsg(boolean isP4, boolean toPredecessor){
outgoingMsg.setSourceVertexId(getVertexId());
- // TODO pass in isForward
- for(byte d: OutgoingListFlag.values)
- outgoingMsg.getNode().setEdgeList(d, getVertexValue().getEdgeList(d)); // TODO check
-
// TODO pass in the vertexId rather than isP4 (removes this blockļ¼
if(isP4)
- outgoingMsg.setFlip(ifFilpWithSuccessor());
+ outgoingMsg.setFlip(ifFlipWithNeighbor(!toPredecessor)); //ifFilpWithSuccessor()
else
outgoingMsg.setFlip(ifFilpWithSuccessor(incomingMsg.getSourceVertexId()));
- kmerIterator = getVertexValue().getRFList().getKeys();
- while(kmerIterator.hasNext()){
- outFlag &= MessageFlag.DIR_CLEAR;
- outFlag |= MessageFlag.DIR_RF;
- outgoingMsg.setFlag(outFlag);
- destVertexId.setAsCopy(kmerIterator.next());
- // TODO DON'T NEED TO SEARCH for this
-// setPredecessorToMeDir(destVertexId);
- sendMsg(destVertexId, outgoingMsg);
- }
- kmerIterator = getVertexValue().getRRList().getKeys();
- while(kmerIterator.hasNext()){
- outFlag &= MessageFlag.DIR_CLEAR;
- outFlag |= MessageFlag.DIR_RR;
- outgoingMsg.setFlag(outFlag);
- destVertexId.setAsCopy(kmerIterator.next());
-// setPredecessorToMeDir(destVertexId);
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- public void configureUpdateMsgForSuccessor(boolean flag){
- outgoingMsg.setSourceVertexId(getVertexId());
- for(byte d: IncomingListFlag.values)
- outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
+ byte[] mergeDirs = toPredecessor ? OutgoingListFlag.values : IncomingListFlag.values;
+ byte[] updateDirs = toPredecessor ? IncomingListFlag.values : OutgoingListFlag.values;
- if(flag)
- outgoingMsg.setFlip(ifFlipWithPredecessor());
- else
- outgoingMsg.setFlip(ifFlipWithPredecessor(incomingMsg.getSourceVertexId()));
+ for(byte dir : mergeDirs)
+ outgoingMsg.getNode().setEdgeList(dir, getVertexValue().getEdgeList(dir));
- kmerIterator = getVertexValue().getFFList().getKeys();
- while(kmerIterator.hasNext()){
- outFlag &= MessageFlag.DIR_CLEAR;
- outFlag |= MessageFlag.DIR_FF;
- outgoingMsg.setFlag(outFlag);
- destVertexId.setAsCopy(kmerIterator.next());
-// setSuccessorToMeDir(destVertexId);
- sendMsg(destVertexId, outgoingMsg);
- }
- kmerIterator = getVertexValue().getFRList().getKeys();
- while(kmerIterator.hasNext()){
- outFlag &= MessageFlag.DIR_CLEAR;
- outFlag |= MessageFlag.DIR_FR;
- outgoingMsg.setFlag(outFlag);
- destVertexId.setAsCopy(kmerIterator.next());
-// setSuccessorToMeDir(destVertexId);
- sendMsg(destVertexId, outgoingMsg);
+ for(byte dir : updateDirs){
+ kmerIterator = getVertexValue().getEdgeList(dir).getKeys();
+ while(kmerIterator.hasNext()){
+ outFlag &= MessageFlag.DIR_CLEAR;
+ outFlag |= dir;
+ outgoingMsg.setFlag(outFlag);
+ destVertexId.setAsCopy(kmerIterator.next()); //TODO does destVertexId need deep copy?
+ sendMsg(destVertexId, outgoingMsg);
+ }
}
}
-
- /**
- * send update message to neighber boolean: true == P4, false == P2
- */
- public void broadcastUpdateMsg(boolean flag){
-// if((getVertexValue().getState() & State.VERTEX_MASK) == State.IS_HEAD && (outFlag & State.VERTEX_MASK) != State.IS_FINAL)
-// outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & State.CAN_MERGE_MASK){
- case State.CAN_MERGEWITHPREV:
- /** confugure updateMsg for successor **/
- configureUpdateMsgForSuccessor(flag);
- break;
- case State.CAN_MERGEWITHNEXT:
- /** confugure updateMsg for predecessor **/
- configureUpdateMsgForPredecessor(flag);
- break;
- }
- }
-
/**
- * This vertex tries to merge with next vertex and send update msg to predecesspr
- */
- public void sendUpdateMsgToPredecessor(boolean flag){
- if(getVertexValue().hasNextDest()) //TODO delete
- broadcastUpdateMsg(flag);
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to successor
- */
- public void sendUpdateMsgToSuccessor(boolean flag){
- if(getVertexValue().hasPrevDest())
- broadcastUpdateMsg(flag);
- }
-
- /**
* override sendUpdateMsg and use incomingMsg as parameter automatically
*/
public void sendUpdateMsg(){
- sendUpdateMsg(incomingMsg);
+ sendUpdateMsgForP2(incomingMsg);
}
public void sendFinalUpdateMsg(){
outFlag |= MessageFlag.IS_FINAL;
- sendUpdateMsg(incomingMsg);
+ sendUpdateMsgForP2(incomingMsg);
}
/**
* send update message to neighber for P2
*/
- public void sendUpdateMsg(MessageWritable msg){
+ public void sendUpdateMsgForP2(MessageWritable msg){
outgoingMsg.reset();
outgoingMsg.setUpdateMsg(true);
byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
@@ -257,11 +197,11 @@
switch(neighborToMeDir){
case MessageFlag.DIR_FF:
case MessageFlag.DIR_FR:
- sendUpdateMsgToPredecessor(false);
+ sendUpdateMsg(isP2, toPredecessor);
break;
case MessageFlag.DIR_RF:
case MessageFlag.DIR_RR:
- sendUpdateMsgToSuccessor(false);
+ sendUpdateMsg(isP2, toSuccessor);
break;
}
}
@@ -271,16 +211,16 @@
outgoingMsg.setUpdateMsg(true);
switch(getVertexValue().getState() & MessageFlag.HEAD_CAN_MERGE_MASK){
case MessageFlag.HEAD_CAN_MERGEWITHPREV:
- sendUpdateMsgToSuccessor(false);
+ sendUpdateMsg(isP2, toSuccessor);
break;
case MessageFlag.HEAD_CAN_MERGEWITHNEXT:
- sendUpdateMsgToPredecessor(false);
+ sendUpdateMsg(isP2, toPredecessor);
break;
}
}
public void sendMergeMsgToSuccessor(){
- setSuccessorToMeDir();
+ setNeighborToMeDir(successorToMe);
if(ifFlipWithPredecessor())
outgoingMsg.setFlip(true);
else
@@ -324,13 +264,25 @@
}
/**
+ * send MERGE msg
+ */
+ public void sendMergeMsg(boolean toPredecessor, VKmerBytesWritable mergeDest){
+ setNeighborToMeDir(predecessorToMe);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+// for(byte d: OutgoingListFlag.values)
+// outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
+ outgoingMsg.setNode(getVertexValue().getNode());
+ sendMsg(mergeDest, outgoingMsg);
+ }
+
+ /**
* configure MERGE msg TODO: delete edgelist, merge configureMergeMsgForPredecessor and configureMergeMsgForPredecessorByIn...
*/
public void configureMergeMsgForPredecessor(VKmerBytesWritable mergeDest){
- setPredecessorToMeDir();
+ setNeighborToMeDir(predecessorToMe);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlip(ifFilpWithSuccessor());
// for(byte d: OutgoingListFlag.values)
// outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
outgoingMsg.setNode(getVertexValue().getNode());
@@ -338,52 +290,25 @@
}
public void configureMergeMsgForSuccessor(VKmerBytesWritable mergeDest){
- setSuccessorToMeDir();
+ setNeighborToMeDir(successorToMe);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlip(ifFlipWithPredecessor()); // TODO seems incorrect for outgoing... why predecessor? //TODO REMOVE this flip boolean completely
// for(byte d: IncomingListFlag.values)
// outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
outgoingMsg.setNode(getVertexValue().getNode());
sendMsg(mergeDest, outgoingMsg);
}
-// /**
-// * configure MERGE msg
-// */
-// public void configureMergeMsgForPredecessorByIncomingMsg(){
-// setPredecessorToMeDir();
-// outgoingMsg.setFlag(outFlag);
-// outgoingMsg.setFlip(ifFilpWithSuccessor());
-// outgoingMsg.setSourceVertexId(getVertexId());
-// for(byte d: OutgoingListFlag.values)
-// outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
-// outgoingMsg.setNode(getVertexValue().getNode());
-//// outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
-// sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
-// }
-//
-// public void configureMergeMsgForSuccessorByIncomingMsg(){
-// setSuccessorToMeDir();
-// outgoingMsg.setFlag(outFlag);
-// outgoingMsg.setFlip(ifFlipWithPredecessor());
-// outgoingMsg.setSourceVertexId(getVertexId());
-// for(byte d: IncomingListFlag.values)
-// outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
-// outgoingMsg.setNode(getVertexValue().getNode());
-//// outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
-// sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
-// }
/**
- * send merge message to neighber for P4
+ * send merge message to neighber for P4, send message to the merge object and kill self
*/
- public void broadcastMergeMsg(boolean deleteSelf){
+ public void broadcastMergeMsg(boolean isP4){
outFlag |= getHeadMergeDir();
switch(getVertexValue().getState() & State.CAN_MERGE_MASK) {
case State.CAN_MERGEWITHNEXT:
// configure merge msg for successor
configureMergeMsgForSuccessor(getNextDestVertexId()); // TODO getDestVertexId(DIRECTION), then remove the switch statement, sendMergeMsg(DIRECTION)
- if(deleteSelf)
+ if(isP4)
deleteVertex(getVertexId());
else{
getVertexValue().setState(State.IS_DEAD);
@@ -393,7 +318,7 @@
case State.CAN_MERGEWITHPREV:
// configure merge msg for predecessor
configureMergeMsgForPredecessor(getPrevDestVertexId());
- if(deleteSelf)
+ if(isP4)
deleteVertex(getVertexId());
else{
getVertexValue().setState(State.IS_DEAD);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 0683587..0e43c61 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -170,10 +170,10 @@
byte headMergeDir = (byte)(getVertexValue().getState() & State.HEAD_CAN_MERGE_MASK);
switch(headMergeDir){
case State.HEAD_CAN_MERGEWITHPREV:
- sendUpdateMsgToSuccessor(true);
+ sendUpdateMsg(isP1, toSuccessor);
break;
case State.HEAD_CAN_MERGEWITHNEXT:
- sendUpdateMsgToPredecessor(true);
+ sendUpdateMsg(isP1, toPredecessor);
break;
}
} else
@@ -186,7 +186,7 @@
public void processUpdateOnceReceiveMsg(Iterator<PathMergeMessageWritable> msgIterator){
while(msgIterator.hasNext()){
incomingMsg = msgIterator.next();
- processUpdate();
+ processUpdate(incomingMsg);
if(isHaltNode())
voteToHalt();
else
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index 865c1cc..0e67efc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -254,7 +254,7 @@
* configure MERGE msg For P2
*/
public void configureP2MergeMsgForPredecessor(VKmerBytesWritable mergeDest){
- setPredecessorToMeDir();
+ setNeighborToMeDir(predecessorToMe);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setFlip(ifFilpWithSuccessor());
@@ -263,7 +263,7 @@
}
public void configureP2MergeMsgForSuccessor(VKmerBytesWritable mergeDest){
- setSuccessorToMeDir();
+ setNeighborToMeDir(successorToMe);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setFlip(ifFlipWithPredecessor());
@@ -563,7 +563,7 @@
voteToHalt();
} else if(incomingMsg.isUpdateMsg() && (selfFlag == State.IS_OLDHEAD || isValidUpateNode())){// only old head update edges
if(!isHaltNode())
- processUpdate();
+ processUpdate(incomingMsg);
voteToHalt();
} else if(isFinalMergeMsg()){// for final processing, receive msg from head, which means final merge (2) ex. 2, 8
sendFinalMergeMsg();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index c61e4b7..1e20fb6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -11,7 +11,6 @@
import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
import edu.uci.ics.genomix.type.NodeWritable.IncomingListFlag;
import edu.uci.ics.genomix.type.NodeWritable.OutgoingListFlag;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -23,7 +22,7 @@
*/
public class P4ForPathMergeVertex extends
BasicPathMergeVertex<VertexValueWritable, PathMergeMessageWritable> {
-
+
private static long randSeed = 1; //static for save memory
private float probBeingRandomHead = -1;
private Random randGenerator = null;
@@ -89,6 +88,9 @@
protected boolean setPrevInfo(VertexValueWritable value) {
if(getHeadMergeDir() == State.HEAD_CAN_MERGEWITHNEXT)
return false;
+ if (isTandemRepeat(value)) {
+ return false;
+ }
if (value.inDegree() == 1) {
for(byte dir : IncomingListFlag.values){
if(value.getEdgeList(dir).getCountOfPosition() > 0){
@@ -107,6 +109,9 @@
protected boolean setNextInfo(VertexValueWritable value) {
if(getHeadMergeDir() == State.HEAD_CAN_MERGEWITHPREV)
return false;
+ if (isTandemRepeat(value)) {
+ return false;
+ }
// TODO make sure the degree is correct
if (value.outDegree() == 1) {
for(byte dir : OutgoingListFlag.values){
@@ -119,7 +124,108 @@
}
return false;
}
-
+
+ /**
+ * step1 : sendUpdates
+ */
+ public void sendUpdates(){
+ //initiate merge_dir
+ setStateAsNoMerge();
+
+ // only PATH vertices are present. Find the ID's for my neighbors
+ curKmer = getVertexId();
+ curHead = isNodeRandomHead(curKmer);
+
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(getVertexValue()); // TODO make this false if the node is restricted by its neighbors or by structure(when you combine steps 2 and 3)
+ hasPrev = setPrevInfo(getVertexValue());
+ if (hasNext || hasPrev) {
+ if (curHead) {
+ if (hasNext && !nextHead) {
+ // compress this head to the forward tail
+ setStateAsMergeDir(mergeWithNext);
+ sendUpdateMsg(isP4, toPredecessor);
+ } else if (hasPrev && !prevHead) {
+ // compress this head to the reverse tail
+ setStateAsMergeDir(mergeWithPrev);
+ sendUpdateMsg(isP4, toSuccessor);
+ }
+ }
+ else {
+ // I'm a tail
+ if (hasNext && hasPrev) {
+ if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
+ // tails on both sides, and I'm the "local minimum"
+ // compress me towards the tail in forward dir
+ setStateAsMergeDir(mergeWithNext);
+ sendUpdateMsg(isP4, toPredecessor);
+ }
+ } else if (!hasPrev) {
+ // no previous node
+ if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
+ // merge towards tail in forward dir
+ setStateAsMergeDir(mergeWithNext);
+ sendUpdateMsg(isP4, toPredecessor);
+ }
+ } else if (!hasNext) {
+ // no next node
+ if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
+ // merge towards tail in reverse dir
+ setStateAsMergeDir(mergeWithPrev);
+ sendUpdateMsg(isP4, toSuccessor);
+ }
+ }
+ }
+ } // TODO else voteToHalt (when I combine steps 2 and 3)
+ this.activate();
+ }
+
+ /**
+ * step2: receiveUpdates
+ */
+ public void receiveUpdates(Iterator<PathMergeMessageWritable> msgIterator){
+ //update neighber
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ processUpdate(incomingMsg);
+ }
+ if(isInactiveNode() || isHeadUnableToMerge()) // check structure and neighbor restriction
+ voteToHalt();
+ else
+ activate();
+ }
+
+ /**
+ * step4: processMerges
+ */
+ public void receiveMerges(Iterator<PathMergeMessageWritable> msgIterator){
+ //merge tmpKmer
+ while (msgIterator.hasNext()) {
+ boolean selfFlag = (getHeadMergeDir() == State.HEAD_CAN_MERGEWITHPREV || getHeadMergeDir() == State.HEAD_CAN_MERGEWITHNEXT);
+ incomingMsg = msgIterator.next();
+ /** process merge **/
+ processMerge(incomingMsg);
+ // set statistics counter: Num_MergedNodes
+ updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
+ /** if it's a tandem repeat, which means detecting cycle **/
+ if(isTandemRepeat(getVertexValue())){
+ // set statistics counter: Num_Cycles
+ updateStatisticsCounter(StatisticsCounter.Num_Cycles);
+ voteToHalt(); // TODO make sure you're checking structure to preclude tandem repeats
+ }/** head meets head, stop **/
+ else if(!VertexUtil.isCanMergeVertex(getVertexValue()) || isHeadMeetsHead(selfFlag)){
+ getVertexValue().setState(State.HEAD_CANNOT_MERGE);
+ // set statistics counter: Num_MergedPaths
+ updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
+ voteToHalt();
+ }else{
+ activate();
+ }
+ getVertexValue().setCounters(counters);
+ }
+ }
+
@Override
public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
initVertex();
@@ -127,111 +233,14 @@
startSendMsg();
else if (getSuperstep() == 2)
initState(msgIterator);
- else if (getSuperstep() % 4 == 3){
- // TODO separate function for this block
- //initiate merge_dir
- setStateAsNoMerge();
-
- // only PATH vertices are present. Find the ID's for my neighbors
- curKmer = getVertexId();
- curHead = isNodeRandomHead(curKmer);
-
- // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
- // We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue()); // TODO make this false if the node is restricted by its neighbors or by structure
- hasPrev = setPrevInfo(getVertexValue());
- if (isTandemRepeat(getVertexValue())) {
- hasNext = false;
- hasPrev = false;
- }
- if (hasNext || hasPrev) {
- if (curHead) {
- if (hasNext && !nextHead) {
- // compress this head to the forward tail
- setStateAsMergeWithNext();
-// configureUpdateMsg(isNextOrPrevious?, getVertexValue()); // TODO change to sendmsg or sendUpdateFrom...
- sendUpdateMsgToPredecessor(true); //TODO all of these can be simplified
- } else if (hasPrev && !prevHead) {
- // compress this head to the reverse tail
- setStateAsMergeWithPrev();
- sendUpdateMsgToSuccessor(true);
- }
- }
- else {
- // I'm a tail
- if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
- // tails on both sides, and I'm the "local minimum"
- // compress me towards the tail in forward dir
- setStateAsMergeWithNext();
- sendUpdateMsgToPredecessor(true);
- }
- } else if (!hasPrev) {
- // no previous node
- if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
- // merge towards tail in forward dir
- setStateAsMergeWithNext();
- sendUpdateMsgToPredecessor(true);
- }
- } else if (!hasNext) {
- // no next node
- if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
- // merge towards tail in reverse dir
- setStateAsMergeWithPrev();
- sendUpdateMsgToSuccessor(true);
- }
- }
- }
- } // TODO else voteToHalt (when you combine steps 2 and 3)
- this.activate();
- }
- else if (getSuperstep() % 4 == 0){
- // TODO separate function for this step
- //update neighber
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- processUpdate(); // TODO pass incomingMsg as a parameter
-
- // TODO move outside the loop
- if(isInactiveNode() || isHeadUnableToMerge()) // check structure and neighbor restriction
- voteToHalt();
- else
- activate();
- }
- } else if (getSuperstep() % 4 == 1){
- //send message to the merge object and kill self
+ else if (getSuperstep() % 4 == 3)
+ sendUpdates();
+ else if (getSuperstep() % 4 == 0)
+ receiveUpdates(msgIterator);
+ else if (getSuperstep() % 4 == 1){
broadcastMergeMsg(true);
- } else if (getSuperstep() % 4 == 2){
- //merge tmpKmer
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- boolean selfFlag = (getHeadMergeDir() == State.HEAD_CAN_MERGEWITHPREV || getHeadMergeDir() == State.HEAD_CAN_MERGEWITHNEXT);
- /** process merge **/
- processMerge(); // TODO use incomingMsg as a parameter
- // set statistics counter: Num_MergedNodes
- updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
- /** if it's a tandem repeat, which means detecting cycle **/
- if(isTandemRepeat(getVertexValue())){ // TODO check 3 node cycle to make sure the update is cocrect (try several times)
- for(byte d : DirectionFlag.values)
- getVertexValue().getEdgeList(d).reset(); // TODO don't remove tandem repeats but DO stop merging // we shouldn't need to update neighbors
- // set statistics counter: Num_TandemRepeats
- updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats); // TODO cycle instead of tandem repeat
- getVertexValue().setCounters(counters);
- voteToHalt(); // TODO make sure you're checking structure to preclude tandem repeats
- }/** head meets head, stop **/
- else if(!VertexUtil.isCanMergeVertex(getVertexValue()) || isHeadMeetsHead(selfFlag)){
- getVertexValue().setState(State.HEAD_CANNOT_MERGE);
- // set statistics counter: Num_MergedPaths
- updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
- getVertexValue().setCounters(counters);
- voteToHalt();
- }
- else{
- getVertexValue().setCounters(counters); // TODO move all the setCounter calls outside the if/else blocks
- activate();
- }
- }
- }
+ } else if (getSuperstep() % 4 == 2)
+ receiveMerges(msgIterator);
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
index 98f82d3..ee2a683 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
@@ -10,6 +10,7 @@
public static final byte Num_RemovedBridges = 0b0110 << 0;
public static final byte Num_SplitRepeats = 0b0111 << 0;
public static final byte Num_Scaffodings = 0b1000 << 0;
+ public static final byte Num_Cycles = 0b1001 << 0;
public final static class COUNTER_CONTENT{
public static String getContent(byte code){
@@ -42,6 +43,9 @@
case Num_Scaffodings:
r = "num of scaffoldings";
break;
+ case Num_Cycles:
+ r = "num of cycles";
+ break;
}
return r;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
index 13f264a..71444a6 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
@@ -6,22 +6,22 @@
public static Test suite() throws Exception {
String pattern ="PathMerge";
- String testSet[] = {
- "2", "3", "4", "5", "6", "7", "8", "9", "head_6", "head_7",
- "P2_3", "P2_4", "P2_5", "P2_6", "P2_7", "P2_8",
- "LeftAdj", "RightAdj",
- "FR", "RF", "head_FR", "head_RF", "twohead_FR", "twohead_RF",
- "SelfTandemRepeat", "TandemRepeatWithMergeEdge",
- "TandemRepeatWithUnmergeEdge", "ComplexTandemRepeat",
- "SimplePath", "ThreeDuplicate",
- "SimpleBridgePath", "BridgePathWithTandemRepeat",
- "RingPath", "CyclePath",
- "SimpleTreePath", "ComplexTreePath",
- "Triangle", "Rectangle",
- "synthetic",
- "MultiTandemRepeat", "MultiTandemRepeat2", "MultiTandemRepeat3",
- "TandemRepeatWithSmallCycle", "TandemRepeatAndCycle",
- "sameWithEdge", "FR_RF_Simple"
+ String testSet[] = {"ThreeNodesCycle", "RingPath", "SimpleTreePath", "RingPath", "CyclePath", "9", "P2_8", "ComplexTandemRepeat"
+// "2", "3", "4", "5", "6", "7", "8", "9", "head_6", "head_7",
+// "P2_3", "P2_4", "P2_5", "P2_6", "P2_7", "P2_8",
+// "LeftAdj", "RightAdj",
+// "FR", "RF", "head_FR", "head_RF", "twohead_FR", "twohead_RF",
+// "SelfTandemRepeat", "TandemRepeatWithMergeEdge",
+// "TandemRepeatWithUnmergeEdge", "ComplexTandemRepeat",
+// "SimplePath", "ThreeDuplicate",
+// "SimpleBridgePath", "BridgePathWithTandemRepeat",
+// "RingPath", "CyclePath",
+// "SimpleTreePath", "ComplexTreePath",
+// "Triangle", "Rectangle",
+// "synthetic",
+// "MultiTandemRepeat", "MultiTandemRepeat2", "MultiTandemRepeat3",
+// "TandemRepeatWithSmallCycle", "TandemRepeatAndCycle",
+// "sameWithEdge", "FR_RF_Simple"
// "SmallGenome_5",
};