complete MergeBubbleVertex & wait for compute fracSimilarity
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 70ef13f..245fe85 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -9,7 +9,9 @@
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
-public class VertexValueWritable implements WritableComparable<VertexValueWritable> {
+public class VertexValueWritable implements WritableComparable<VertexValueWritable>, Serializable {
+
+ private static final long serialVersionUID = 1L;
public static class State extends VertexStateFlag{
public static final byte NO_MERGE = 0b00 << 3;
@@ -18,10 +20,12 @@
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
+ public static final byte UNCHANGE = 0b0 << 3;
public static final byte KILL = 0b1 << 3;
public static final byte KILL_MASK = 0b1 << 3;
public static final byte DIR_FROM_DEADVERTEX = 0b10 << 3;
+ public static final byte DEAD_MASK = 0b10 << 3;
}
public static class VertexStateFlag extends FakeFlag {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 83686fb..c92148f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -3,8 +3,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -22,10 +24,12 @@
*/
public class BubbleMergeVertex extends
BasicGraphCleanVertex {
-
+ public static final String DISSIMILARITY_THRESHOLD = "BubbleMergeVertex.dissimilarThreshold";
+ private float dissimilarThreshold = -1;
+
private Map<VKmerBytesWritable, ArrayList<MessageWritable>> receivedMsgMap = new HashMap<VKmerBytesWritable, ArrayList<MessageWritable>>();
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
-
+
/**
* initiate kmerSize, maxIteration
*/
@@ -34,6 +38,8 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(dissimilarThreshold == -1)
+ dissimilarThreshold = getContext().getConfiguration().getFloat(DISSIMILARITY_THRESHOLD, (float) 0.05);
if(incomingMsg == null)
incomingMsg = new MessageWritable();
if(outgoingMsg == null)
@@ -42,6 +48,7 @@
outgoingMsg.reset(kmerSize);
if(destVertexId == null)
destVertexId = new VKmerBytesWritable();
+ outFlag = 0;
}
public void sendBubbleAndMajorVertexMsgToMinorVertex(){
@@ -109,18 +116,47 @@
/** aggregate bubble nodes and grouped by major vertex **/
aggregateBubbleNodesByMajorNode(msgIterator);
+ Set<MessageWritable> unchangedSet = new HashSet<MessageWritable>();
+ Set<MessageWritable> deletedSet = new HashSet<MessageWritable>();
for(VKmerBytesWritable prevId : receivedMsgMap.keySet()){
if(receivedMsgList.size() > 1){ // filter bubble
/** for each startVertex, sort the node by decreasing order of coverage **/
receivedMsgList = receivedMsgMap.get(prevId);
Collections.sort(receivedMsgList, new MessageWritable.SortByCoverage());
- System.out.println("");
-
/** process similarSet, keep the unchanged set and deleted set & add coverage to unchange node **/
+ MessageWritable topCoverageMessage = new MessageWritable();
+ MessageWritable tmpMessage = new MessageWritable();
+ Iterator<MessageWritable> it;
+ while(!receivedMsgList.isEmpty()){
+ it = receivedMsgList.iterator();
+ topCoverageMessage.set(it.next());
+ it.remove(); //delete topCoverage node
+ while(it.hasNext()){
+ tmpMessage.set(it.next());
+ //compute the similarity
+ float fracDissimilar = (float) 0.02;
+ if(fracDissimilar < dissimilarThreshold){ //If similar with top node, delete this node and put it in deletedSet
+ //TODO add coverage to top node
+ deletedSet.add(tmpMessage);
+ it.remove();
+ }
+ }
+ unchangedSet.add(topCoverageMessage);
+ }
/** send message to the unchanged set for updating coverage & send kill message to the deleted set **/
-
+ for(MessageWritable msg : unchangedSet){
+ outFlag = MessageFlag.UNCHANGE;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setAverageCoverage(msg.getAverageCoverage());
+ sendMsg(msg.getSourceVertexId(), outgoingMsg);
+ }
+ for(MessageWritable msg : deletedSet){
+ outFlag = MessageFlag.KILL;
+ outgoingMsg.setFlag(outFlag);
+ sendMsg(msg.getSourceVertexId(), outgoingMsg);
+ }
}
}
} else if (getSuperstep() == 4){
@@ -128,12 +164,14 @@
incomingMsg = msgIterator.next();
if(incomingMsg.getFlag() == MessageFlag.KILL){
broadcaseKillself();
- }
+ } else if(incomingMsg.getFlag() == MessageFlag.UNCHANGE){
+ getVertexValue().setAverageCoverage(incomingMsg.getAverageCoverage());
+ }
}
} else if(getSuperstep() == 5){
if(msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getFlag() == MessageFlag.KILL){
+ if(isResponseKillMsg()){
responseToDeadVertex();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 080cf35..58ef4d6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -22,14 +22,14 @@
public static final String KMER_SIZE = "BasicGraphCleanVertex.kmerSize";
public static final String ITERATIONS = "BasicGraphCleanVertex.iteration";
public static int kmerSize = -1;
- protected int maxIteration = -1;
+ public static int maxIteration = -1;
protected MessageWritable incomingMsg = null;
protected MessageWritable outgoingMsg = null;
protected VKmerBytesWritable destVertexId = null;
protected Iterator<VKmerBytesWritable> kmerIterator;
- protected VKmerBytesWritable tmpKmer = new VKmerBytesWritable(kmerSize);
- byte headFlag;
+ protected VKmerBytesWritable tmpKmer = null;
+ protected byte headFlag;
protected byte outFlag;
protected byte inFlag;
protected byte selfFlag;
@@ -814,9 +814,16 @@
}
}
- public boolean isKillMsg(){
+ public boolean isReceiveKillMsg(){
byte killFlag = (byte) (incomingMsg.getFlag() & MessageFlag.KILL_MASK);
- return killFlag == MessageFlag.KILL;
+ byte deadFlag = (byte) (incomingMsg.getFlag() & MessageFlag.DEAD_MASK);
+ return killFlag == MessageFlag.KILL & deadFlag != MessageFlag.DIR_FROM_DEADVERTEX;
+ }
+
+ public boolean isResponseKillMsg(){
+ byte killFlag = (byte) (incomingMsg.getFlag() & MessageFlag.KILL_MASK);
+ byte deadFlag = (byte) (incomingMsg.getFlag() & MessageFlag.DEAD_MASK);
+ return killFlag == MessageFlag.KILL & deadFlag == MessageFlag.DIR_FROM_DEADVERTEX;
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 10137da..5baf492 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -171,7 +171,7 @@
} else if(getSuperstep() == 5){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(isKillMsg())
+ if(isReceiveKillMsg())
responseToDeadVertex();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index 6b7ad1f..c042f49 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -249,7 +249,7 @@
processFinalMerge(incomingMsg);
/** NON-FAKE and Final vertice send msg to FAKE vertex **/
sendMsgToFakeVertex();
- } else if(isKillMsg()){
+ } else if(isReceiveKillMsg()){
responseToDeadVertex();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 9468681..3ce6c5a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -2,11 +2,6 @@
public class MessageFlag extends DirectionFlag {
- //public static final byte FLIP = 1 << 6;
- public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b101 << 0;
- public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b111 << 0;
-
-
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
return "ERROR_BAD_MESSAGE";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index fb51e52..fe82add 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -13,6 +13,7 @@
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;