MergeBubbleVertex half done
diff --git a/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt b/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt
new file mode 100644
index 0000000..472a7dc
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/RemoveBridge.txt
@@ -0,0 +1,2 @@
+1 AATAG
+2 CACGC
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index d630b5f..e92eaed 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -11,63 +11,27 @@
import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: MessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
/**
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MergeBubbleMessageWritable> {
- public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
- public static final String ITERATIONS = "BubbleMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
+ BasicGraphCleanVertex {
- private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
- private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private Iterator<PositionWritable> iterator;
- private PositionWritable pos = new PositionWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
- private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsgMap = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
- private ArrayList<MergeBubbleMessageWritable> receivedMsgList = new ArrayList<MergeBubbleMessageWritable>();
+ private Map<KmerBytesWritable, ArrayList<MessageWritable>> receivedMsgMap = new HashMap<KmerBytesWritable, ArrayList<MessageWritable>>();
+ private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
/**
* initiate kmerSize, maxIteration
@@ -79,188 +43,39 @@
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
outgoingMsg.reset();
}
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPrevDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getRFList().iterator();
- else // #FRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * check if prev/next destination exists
- */
- public boolean hasNextDest(VertexValueWritable value){
- return value.getFFList().getCountOfPosition() > 0 || value.getFRList().getCountOfPosition() > 0;
- }
-
- public boolean hasPrevDest(VertexValueWritable value){
- return value.getRFList().getCountOfPosition() > 0 || value.getRRList().getCountOfPosition() > 0;
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllPrevNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // FFList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // FRList
- while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
- */
- public void broadcaseKillself(){
- outgoingMsg.setSourceVertexId(getVertexId());
-
- if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
- else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
-
-
- if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
- }
- else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
- }
-
- deleteVertex(getVertexId());
- }
-
- /**
- * do some remove operations on adjMap after receiving the info about dead Vertex
- */
- public void responseToDeadVertex(Iterator<MergeBubbleMessageWritable> msgIterator){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
- //remove incomingMsg.getSourceId from RR positionList
- iterator = getVertexValue().getRRList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
- //remove incomingMsg.getSourceId from RF positionList
- iterator = getVertexValue().getFRList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
- //remove incomingMsg.getSourceId from FR positionList
- iterator = getVertexValue().getRFList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
- //remove incomingMsg.getSourceId from FF positionList
- iterator = getVertexValue().getFFList().iterator();
- while(iterator.hasNext()){
- pos = iterator.next();
- if(pos.equals(incomingMsg.getSourceVertexId())){
- iterator.remove();
- break;
- }
- }
- }
- }
- }
@SuppressWarnings("unchecked")
@Override
- public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
+ public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
if(VertexUtil.isHeadVertexWithIndegree(getVertexValue())
|| VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsgToAllNextNodes(getVertexValue());
+ sendSettledMsgToAllNextNodes(getVertexValue());
}
-// if(VertexUtil.isRearVertexWithOutdegree(getVertexValue())
-// || VertexUtil.isRearWithoutOutdegree(getVertexValue())){
-// outgoingMsg.setSourceVertexId(getVertexId());
-// sendMsgToAllPrevNodes(getVertexValue());
-// }
} else if (getSuperstep() == 2){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(VertexUtil.isPathVertex(getVertexValue())){
- switch(incomingMsg.getMessage()){
- case AdjMessage.FROMFF:
- case AdjMessage.FROMRF:
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
if(hasNextDest(getVertexValue())){
- outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
break;
- case AdjMessage.FROMFR:
- case AdjMessage.FROMRR:
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
if(hasPrevDest(getVertexValue())){
- outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
destVertexId.set(getPrevDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -274,32 +89,23 @@
if(!receivedMsgMap.containsKey(incomingMsg.getStartVertexId())){
receivedMsgList.clear();
receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
}
else{
receivedMsgList.clear();
receivedMsgList.addAll(receivedMsgMap.get(incomingMsg.getStartVertexId()));
receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
}
}
- for(PositionWritable prevId : receivedMsgMap.keySet()){
+ for(KmerBytesWritable prevId : receivedMsgMap.keySet()){
receivedMsgList = receivedMsgMap.get(prevId);
if(receivedMsgList.size() > 1){
- //find the node with largest length of Kmer
- boolean flag = true; //the same length
- int maxLength = receivedMsgList.get(0).getLengthOfChain();
- PositionWritable max = receivedMsgList.get(0).getSourceVertexId();
- PositionWritable secondMax = receivedMsgList.get(0).getSourceVertexId();
- for(int i = 1; i < receivedMsgList.size(); i++){
- if(receivedMsgList.get(i).getLengthOfChain() != maxLength)
- flag = false;
- if(receivedMsgList.get(i).getLengthOfChain() >= maxLength){
- maxLength = receivedMsgList.get(i).getLengthOfChain();
- secondMax.set(max);
- max = receivedMsgList.get(i).getSourceVertexId();
- }
- }
+ /** for each startVertex, sort the node by decreasing order of coverage **/
+
+ /** process similarSet, keep the unchanged set and deleted set & add coverage to unchange node **/
+
+ /** send message to the unchanged set for updating coverage & send kill message to the deleted set **/
//send unchange or merge Message to node with largest length
if(flag == true){
//1. send unchange Message to node with largest length
@@ -367,7 +173,7 @@
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
Client.run(args, job);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index d4a2355..f201fb1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -66,10 +66,10 @@
* get destination vertex
*/
public KmerBytesWritable getNextDestVertexId(VertexValueWritable value) {
- if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+ if (value.getFFList().getCountOfPosition() > 0){ //#FFList() > 0
kmerIterator = value.getFFList().iterator();
return kmerIterator.next();
- } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
+ } else if (value.getFRList().getCountOfPosition() > 0){ //#FRList() > 0
kmerIterator = value.getFRList().iterator();
return kmerIterator.next();
} else {
@@ -77,11 +77,11 @@
}
}
- public KmerBytesWritable getPreDestVertexId(VertexValueWritable value) {
- if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+ public KmerBytesWritable getPrevDestVertexId(VertexValueWritable value) {
+ if (value.getRFList().getCountOfPosition() > 0){ //#RFList() > 0
kmerIterator = value.getRFList().iterator();
return kmerIterator.next();
- } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
+ } else if (value.getRRList().getCountOfPosition() > 0){ //#RRList() > 0
kmerIterator = value.getRRList().iterator();
return kmerIterator.next();
} else {
@@ -109,7 +109,7 @@
}
- public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
+ public KmerBytesWritable getPrevDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
kmerIterator = value.getRFList().iterator();
outFlag &= MessageFlag.DIR_CLEAR;
@@ -189,7 +189,7 @@
else if(getVertexValue().getRRList().getCountOfPosition() > 0)
outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getPreDestVertexId(getVertexValue()));
+ destVertexId.set(getPrevDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -350,8 +350,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- if(getPreDestVertexId(getVertexValue()) != null)
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ if(getPrevDestVertexId(getVertexValue()) != null)
+ sendMsg(getPrevDestVertexId(getVertexValue()), outgoingMsg);
break;
}
}
@@ -494,7 +494,7 @@
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ sendMsg(getPrevDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index 6e3517b..fc78c34 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -96,7 +96,7 @@
}
//send wantToMerge to prev
- tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+ tmpKmer = getPrevDestVertexIdAndSetFlag(getVertexValue());
if(tmpKmer != null){
destVertexId.set(tmpKmer);
outgoingMsg.setFlag(outFlag);