adapt BridgeAddVertex to new graph construction
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index fa353d0..a6b5169 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -4,6 +4,7 @@
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
@@ -47,7 +48,7 @@
* Naive Algorithm for path merge graph
*/
public class BridgeAddVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
@@ -68,34 +69,33 @@
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
- getVertexValue().getFFList().append(3, (byte)1);
+ if(getVertexId().toString().equals("ATA")){
+ KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+ vertexId.setByRead("GTA".getBytes(), 0);
+ getVertexValue().getFRList().append(vertexId);
//add bridge vertex
@SuppressWarnings("rawtypes")
Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
vertex.getMsgList().clear();
vertex.getEdges().clear();
- PositionWritable vertexId = new PositionWritable();
VertexValueWritable vertexValue = new VertexValueWritable();
/**
* set the src vertex id
*/
- vertexId.set(3, (byte)1);
vertex.setVertexId(vertexId);
/**
* set the vertex value
*/
- byte[] array = { 'T', 'A', 'G', 'C', 'C'};
- KmerBytesWritable kmer = new KmerBytesWritable(array.length);
- kmer.setByRead(array, 0);
- vertexValue.setKmer(kmer);
- PositionListWritable plist = new PositionListWritable();
- plist.append(new PositionWritable(1, (byte)2));
- vertexValue.setRRList(plist);
- PositionListWritable plist2 = new PositionListWritable();
- plist2.append(new PositionWritable(2, (byte)2));
- vertexValue.setFFList(plist2);
+ KmerListWritable kmerFRList = new KmerListWritable(kmerSize);
+ kmerFRList.append(getVertexId());
+ vertexValue.setFRList(kmerFRList);
+ KmerBytesWritable otherVertexId = new KmerBytesWritable(kmerSize);
+ otherVertexId.setByRead("ACG".getBytes(), 0);
+ KmerListWritable kmerRFList = new KmerListWritable(kmerSize);
+ kmerRFList.append(otherVertexId);
+ vertexValue.setRFList(kmerRFList);
+ vertexValue.setKmer(vertexId);
vertex.setVertexValue(vertexValue);
addVertex(vertexId, vertex);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 2314245..912cfa5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -5,12 +5,10 @@
import org.apache.hadoop.io.NullWritable;
-
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.GeneCode;
@@ -168,19 +166,47 @@
}
/**
+ * tip send message with sourceId and dir to previous node
+ * tip only has one incoming
+ */
+ public void sendSettledMsgToPreviousNode(){
+ if(getVertexValue().getFFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
+ * tip send message with sourceId and dir to next node
+ * tip only has one outgoing
+ */
+ public void sendSettledMsgToNextNode(){
+ if(getVertexValue().getRFList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getPreDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
* head send message to all previous nodes
*/
public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
kmerIterator = value.getRFList().iterator(); // RFList
while(kmerIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRF);
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
kmerIterator = value.getRRList().iterator(); // RRList
while(kmerIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -193,14 +219,14 @@
public void sendSettledMsgToAllNextNodes(VertexValueWritable value) {
kmerIterator = value.getFFList().iterator(); // FFList
while(kmerIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
kmerIterator = value.getFRList().iterator(); // FRList
while(kmerIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(kmerIterator.next());
sendMsg(destVertexId, outgoingMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index c1138de..bee2008 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -10,7 +10,6 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
@@ -66,35 +65,26 @@
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFF);
- else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMFR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getNextDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+ sendSettledMsgToPreviousNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfKmer() <= length){
- if(getVertexValue().getRFList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- else if(getVertexValue().getRRList().getCountOfPosition() > 0)
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getPreDestVertexId(getVertexValue()));
- sendMsg(destVertexId, outgoingMsg);
+ sendSettledMsgToNextNode();
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getLengthOfKmer() > length)
+ if(getVertexValue().getLengthOfKmer() <= length)
deleteVertex(getVertexId());
}
}
else if(getSuperstep() == 2){
- responseToDeadVertex(msgIterator);
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseToDeadVertex();
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index b7f6a4f..5a0591d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,6 +1,5 @@
package edu.uci.ics.genomix.pregelix.util;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;