Make pathmerge robost
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
index b412cda..3029d27 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/EdgeListWritable.java
@@ -96,6 +96,14 @@
return total;
}
+ public EdgeWritable getEdge(VKmerBytesWritable key){
+ for(EdgeWritable edge : edges){
+ if(edge.getKey().equals(key)){
+ return edge;
+ }
+ }
+ return null;
+ }
/**
* Return this Edge's representation as a new byte array
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index aa79c3a..9dbe50f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -313,22 +313,22 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithManyOutgoing(getVertexValue())) {
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())) {
outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
sendMsgToAllNextNodes();
voteToHalt();
}
- if (VertexUtil.isRearVertexWithManyIncoming(getVertexValue())) {
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
sendMsgToAllPreviousNodes();
voteToHalt();
}
- if (VertexUtil.isHeadVertexWithOnlyOneOutgoing(getVertexValue())){
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())){
outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
- if (VertexUtil.isRearVertexWithOnlyOneIncoming(getVertexValue())){
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
index 4f1b4a5..6ea0442 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
@@ -9,6 +9,7 @@
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.EdgeWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -25,6 +26,7 @@
private byte repeatDir = 0;
private VertexValueWritable tmpValue = new VertexValueWritable();
+ private EdgeWritable tmpEdge = new EdgeWritable();
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
@@ -54,6 +56,21 @@
return false;
}
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
public byte flipDir(byte dir){
switch(dir){
case DirectionFlag.DIR_FF:
@@ -64,8 +81,9 @@
return DirectionFlag.DIR_FF;
case DirectionFlag.DIR_RR:
return DirectionFlag.DIR_FR;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
}
- return 0;
}
/**
@@ -77,12 +95,10 @@
boolean hasFlip = false;
/** pick one edge and flip **/
for(byte d : DirectionFlag.values){
- for(EdgeWritable edge : getVertexValue().getEdgeList(d)){
- byte flipDir = flipDir(repeatDir);
- getVertexValue().getEdgeList(flipDir).add(edge);
- getVertexValue().getEdgeList(d).remove(edge);
- /** send flip message to node for updating edgeDir **/
- outgoingMsg.setFlag(flipDir);
+ for(EdgeWritable edge : tmpValue.getEdgeList(d)){
+ byte flipDir = flipDir(d);
+ tmpValue.getEdgeList(flipDir).add(edge);
+ tmpValue.getEdgeList(d).remove(edge);
/** setup hasFlip to go out of the loop **/
hasFlip = true;
break;
@@ -104,6 +120,24 @@
public void mergeTandemRepeat(){
getVertexValue().getInternalKmer().mergeWithKmerInDir(repeatDir, kmerSize, getVertexId());
getVertexValue().getEdgeList(repeatDir).remove(getVertexId());
+ boolean hasFlip = false;
+ /** pick one edge and flip **/
+ for(byte d : DirectionFlag.values){
+ for(EdgeWritable edge : getVertexValue().getEdgeList(d)){
+ byte flipDir = flipDir(d);
+ getVertexValue().getEdgeList(flipDir).add(edge);
+ getVertexValue().getEdgeList(d).remove(edge);
+ /** send flip message to node for updating edgeDir **/
+ outgoingMsg.setFlag(flipDir);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(edge.getKey(), outgoingMsg);
+ /** setup hasFlip to go out of the loop **/
+ hasFlip = true;
+ break;
+ }
+ if(hasFlip)
+ break;
+ }
}
@Override
@@ -114,6 +148,18 @@
mergeTandemRepeat();
}
voteToHalt();
+ } else if(getSuperstep() == 2){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ /** update edge **/
+ byte flipDir = flipDir(incomingMsg.getFlag());
+ byte prevNeighborToMe = mirrorDirection(flipDir);
+ byte curNeighborToMe = mirrorDirection(incomingMsg.getFlag());
+ tmpEdge.setAsCopy(getVertexValue().getEdgeList(prevNeighborToMe).getEdge(incomingMsg.getSourceVertexId()));
+ getVertexValue().getEdgeList(prevNeighborToMe).remove(incomingMsg.getSourceVertexId());
+ getVertexValue().getEdgeList(curNeighborToMe).add(tmpEdge);
+ }
+ voteToHalt();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 0b396fb..2939dd8 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -83,7 +83,7 @@
private static void generateP2ForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(P2ForPathMergeVertex.class);
- job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(VKmerBytesWritable.class);
@@ -118,7 +118,7 @@
private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(P4ForPathMergeVertex.class);
- job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(VKmerBytesWritable.class);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
index 3468f85..970a6ae 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
@@ -10,7 +10,7 @@
{
// "SimplePath",
// "ThreeDuplicate",
- "BridgePath",
+ "BridgePath_AfterUnroll"
// "CyclePath",
// "SelfPath"
// "TreePath"
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P2ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P2ForMergeGraph.xml
index d6a0759..4311d02 100644
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P2ForMergeGraph.xml
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/P2ForMergeGraph.xml
@@ -122,7 +122,7 @@
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
index 16e0c69..b0f07b3 100644
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
@@ -122,7 +122,7 @@
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>
diff --git a/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt b/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt
index 1ecfd69..3d007d2 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_PathMerge.txt
@@ -1 +1 @@
-P2ForMergeGraph.xml
+P4ForMergeGraph.xml