add computeSimilarity and mergeAverageCoverage to MergeBubbleVertex
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 5cb3169..bfb1e64 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -9,6 +9,7 @@
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -19,7 +20,7 @@
* file stores the point to the file that stores the chains of connected DNA
*/
private VKmerBytesWritable sourceVertexId;
- private VKmerBytesWritable kmer;
+ private VKmerBytesWritable actualKmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private PositionListWritable nodeIdList = new PositionListWritable();
private float averageCoverage;
@@ -33,7 +34,7 @@
public MessageWritable() {
sourceVertexId = new VKmerBytesWritable();
- kmer = new VKmerBytesWritable();
+ actualKmer = new VKmerBytesWritable();
neighberNode = new AdjacencyListWritable();
startVertexId = new VKmerBytesWritable();
averageCoverage = 0;
@@ -45,7 +46,7 @@
public MessageWritable(int kmerSize) {
kmerlength = kmerSize;
sourceVertexId = new VKmerBytesWritable(kmerSize);
- kmer = new VKmerBytesWritable(0);
+ actualKmer = new VKmerBytesWritable(0);
neighberNode = new AdjacencyListWritable(kmerSize);
startVertexId = new VKmerBytesWritable(kmerSize);
@@ -62,9 +63,9 @@
checkMessage |= CheckMessage.SOURCE;
this.sourceVertexId.setAsCopy(msg.getSourceVertexId());
}
- if (kmer != null) {
+ if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.setAsCopy(msg.getActualKmer());
+ this.actualKmer.setAsCopy(msg.getActualKmer());
}
if (neighberNode != null) {
@@ -89,7 +90,7 @@
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.setAsCopy(chainVertexId);
+ this.actualKmer.setAsCopy(chainVertexId);
}
if (neighberNode != null) {
@@ -106,7 +107,7 @@
public void reset(int kmerSize) {
checkMessage = (byte) 0;
kmerlength = kmerSize;
-// kmer.reset();
+// actualKmer.reset();
neighberNode.reset(kmerSize);
startVertexId.reset(kmerSize);
averageCoverage = 0;
@@ -126,25 +127,25 @@
}
public VKmerBytesWritable getActualKmer() {
- return kmer;
+ return actualKmer;
}
public void setActualKmer(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.setAsCopy(actualKmer);
+ this.actualKmer.setAsCopy(actualKmer);
}
}
public VKmerBytesWritable getCreatedVertexId() {
- return kmer;
+ return actualKmer;
}
public void setCreatedVertexId(VKmerBytesWritable actualKmer) {
if (actualKmer != null) {
checkMessage |= CheckMessage.ACUTUALKMER;
- this.kmer.setAsCopy(actualKmer);
+ this.actualKmer.setAsCopy(actualKmer);
}
}
@@ -180,7 +181,7 @@
}
public int getLengthOfChain() {
- return kmer.getKmerLetterLength();
+ return actualKmer.getKmerLetterLength();
}
public byte getFlag() {
@@ -226,7 +227,7 @@
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
- kmer.write(out);
+ actualKmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
@@ -247,7 +248,7 @@
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
if ((checkMessage & CheckMessage.ACUTUALKMER) != 0)
- kmer.readFields(in);
+ actualKmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
@@ -290,4 +291,16 @@
return Float.compare(left.averageCoverage, right.averageCoverage);
}
}
+
+ /**
+ * Update my coverage to be the average of this and other. Used when merging paths.
+ */
+ public void mergeCoverage(MessageWritable other) {
+ // sequence considered in the average doesn't include anything overlapping with other kmers
+ float adjustedLength = actualKmer.getKmerLetterLength() + other.actualKmer.getKmerLetterLength() - (KmerBytesWritable.getKmerLength() - 1) * 2;
+
+ float myCount = (actualKmer.getKmerLetterLength() - KmerBytesWritable.getKmerLength() - 1) * averageCoverage;
+ float otherCount = (other.actualKmer.getKmerLetterLength() - KmerBytesWritable.getKmerLength() - 1) * other.averageCoverage;
+ averageCoverage = (myCount + otherCount) / adjustedLength;
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 245fe85..3a37794 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -6,6 +6,8 @@
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
@@ -360,5 +362,4 @@
}
}
-
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index c92148f..c97244a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -29,6 +29,8 @@
private Map<VKmerBytesWritable, ArrayList<MessageWritable>> receivedMsgMap = new HashMap<VKmerBytesWritable, ArrayList<MessageWritable>>();
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
+ private Set<MessageWritable> unchangedSet = new HashSet<MessageWritable>();
+ private Set<MessageWritable> deletedSet = new HashSet<MessageWritable>();
/**
* initiate kmerSize, maxIteration
@@ -96,6 +98,48 @@
}
}
+ public void processSimilarSetToUnchangeSetAndDeletedSet(){
+ unchangedSet.clear();
+ deletedSet.clear();
+ MessageWritable topCoverageMessage = new MessageWritable();
+ MessageWritable tmpMessage = new MessageWritable();
+ Iterator<MessageWritable> it;
+ while(!receivedMsgList.isEmpty()){
+ it = receivedMsgList.iterator();
+ topCoverageMessage.set(it.next());
+ it.remove(); //delete topCoverage node
+ while(it.hasNext()){
+ tmpMessage.set(it.next());
+ //compute the similarity
+ float fracDissimilar = topCoverageMessage.getSourceVertexId().fracDissimilar(tmpMessage.getSourceVertexId());
+ if(fracDissimilar < dissimilarThreshold){ //If similar with top node, delete this node and put it in deletedSet
+ //add coverage to top node
+ topCoverageMessage.mergeCoverage(tmpMessage);
+ deletedSet.add(tmpMessage);
+ it.remove();
+ }
+ }
+ unchangedSet.add(topCoverageMessage);
+ }
+ }
+
+ public void processUnchangedSet(){
+ for(MessageWritable msg : unchangedSet){
+ outFlag = MessageFlag.UNCHANGE;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setAverageCoverage(msg.getAverageCoverage());
+ sendMsg(msg.getSourceVertexId(), outgoingMsg);
+ }
+ }
+
+ public void processDeletedSet(){
+ for(MessageWritable msg : deletedSet){
+ outFlag = MessageFlag.KILL;
+ outgoingMsg.setFlag(outFlag);
+ sendMsg(msg.getSourceVertexId(), outgoingMsg);
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -116,8 +160,7 @@
/** aggregate bubble nodes and grouped by major vertex **/
aggregateBubbleNodesByMajorNode(msgIterator);
- Set<MessageWritable> unchangedSet = new HashSet<MessageWritable>();
- Set<MessageWritable> deletedSet = new HashSet<MessageWritable>();
+
for(VKmerBytesWritable prevId : receivedMsgMap.keySet()){
if(receivedMsgList.size() > 1){ // filter bubble
/** for each startVertex, sort the node by decreasing order of coverage **/
@@ -125,38 +168,11 @@
Collections.sort(receivedMsgList, new MessageWritable.SortByCoverage());
/** process similarSet, keep the unchanged set and deleted set & add coverage to unchange node **/
- MessageWritable topCoverageMessage = new MessageWritable();
- MessageWritable tmpMessage = new MessageWritable();
- Iterator<MessageWritable> it;
- while(!receivedMsgList.isEmpty()){
- it = receivedMsgList.iterator();
- topCoverageMessage.set(it.next());
- it.remove(); //delete topCoverage node
- while(it.hasNext()){
- tmpMessage.set(it.next());
- //compute the similarity
- float fracDissimilar = (float) 0.02;
- if(fracDissimilar < dissimilarThreshold){ //If similar with top node, delete this node and put it in deletedSet
- //TODO add coverage to top node
- deletedSet.add(tmpMessage);
- it.remove();
- }
- }
- unchangedSet.add(topCoverageMessage);
- }
+ processSimilarSetToUnchangeSetAndDeletedSet();
/** send message to the unchanged set for updating coverage & send kill message to the deleted set **/
- for(MessageWritable msg : unchangedSet){
- outFlag = MessageFlag.UNCHANGE;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setAverageCoverage(msg.getAverageCoverage());
- sendMsg(msg.getSourceVertexId(), outgoingMsg);
- }
- for(MessageWritable msg : deletedSet){
- outFlag = MessageFlag.KILL;
- outgoingMsg.setFlag(outFlag);
- sendMsg(msg.getSourceVertexId(), outgoingMsg);
- }
+ processUnchangedSet();
+ processDeletedSet();
}
}
} else if (getSuperstep() == 4){
@@ -165,6 +181,7 @@
if(incomingMsg.getFlag() == MessageFlag.KILL){
broadcaseKillself();
} else if(incomingMsg.getFlag() == MessageFlag.UNCHANGE){
+ /** update average coverage **/
getVertexValue().setAverageCoverage(incomingMsg.getAverageCoverage());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 4582557..e2e7dfb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -2,6 +2,7 @@
import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
public class VertexUtil {
@@ -124,4 +125,5 @@
else
return null;
}
+
}
diff --git a/genomix/genomix-pregelix/src/test/resources/only_bubbleadd.txt b/genomix/genomix-pregelix/src/test/resources/only_bubbleadd.txt
new file mode 100644
index 0000000..a4c7fc8
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/resources/only_bubbleadd.txt
@@ -0,0 +1 @@
+BubbleAddGraph.xml