add subclass BasicVertex and fixing p2
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 2c64139..e8b241c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -37,6 +37,11 @@
public static final byte DIR_RR = 0b11 << 0;
public static final byte DIR_MASK = 0b11 << 0;
}
+
+ public static class MergeDirFlag extends DirectionFlag{
+ public static final byte SHOULD_MERGEWITHNEXT = 0b000 << 0;
+ public static final byte SHOULD_MERGEWITHPREV = 0b001 << 0;
+ }
private PositionWritable nodeID;
private PositionListWritable forwardForwardList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index cf254d1..f9a3444 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -70,7 +70,7 @@
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
- vertexValue.setMergeChain(node.getKmer());
+ vertexValue.setKmer(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 95e24be..32baccd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -41,7 +41,7 @@
throws IOException, InterruptedException {
node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
- vertex.getVertexValue().getRRList(), vertex.getVertexValue().getMergeChain());
+ vertex.getVertexValue().getRRList(), vertex.getVertexValue().getKmer());
getRecordWriter().write(node, nul);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index d10e625..b9af850 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -73,7 +73,7 @@
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
- vertexValue.setMergeChain(node.getKmer());
+ vertexValue.setKmer(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index f9f3749..7a1bdfa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -41,7 +41,7 @@
throws IOException, InterruptedException {
node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
- vertex.getVertexValue().getRRList(), vertex.getVertexValue().getMergeChain());
+ vertex.getVertexValue().getRRList(), vertex.getVertexValue().getKmer());
getRecordWriter().write(node, nullWritable);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 15f8e96..104fe84 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -15,39 +15,39 @@
private AdjacencyListWritable incomingList;
private AdjacencyListWritable outgoingList;
private byte state;
- private KmerBytesWritable mergeChain;
+ private KmerBytesWritable kmer;
private PositionWritable mergeDest;
public VertexValueWritable() {
incomingList = new AdjacencyListWritable();
outgoingList = new AdjacencyListWritable();
state = State.NON_VERTEX;
- mergeChain = new KmerBytesWritable(0);
+ kmer = new KmerBytesWritable(0);
mergeDest = new PositionWritable();
}
public VertexValueWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
- byte state, KmerBytesWritable mergeChain) {
+ byte state, KmerBytesWritable kmer) {
set(forwardForwardList, forwardReverseList,
reverseForwardList, reverseReverseList,
- state, mergeChain);
+ state, kmer);
}
public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
- byte state, KmerBytesWritable mergeChain) {
+ byte state, KmerBytesWritable kmer) {
this.incomingList.setForwardList(reverseForwardList);
this.incomingList.setReverseList(reverseReverseList);
this.outgoingList.setForwardList(forwardForwardList);
this.outgoingList.setReverseList(forwardReverseList);
this.state = state;
- this.mergeChain.set(mergeChain);
+ this.kmer.set(kmer);
}
public void set(VertexValueWritable value) {
set(value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
- value.getMergeChain());
+ value.getKmer());
}
public PositionListWritable getFFList() {
@@ -106,24 +106,16 @@
this.state = state;
}
- public int getLengthOfMergeChain() {
- return mergeChain.getKmerLength();
+ public int getLengthOfKmer() {
+ return kmer.getKmerLength();
}
- public KmerBytesWritable getMergeChain() {
- return mergeChain;
- }
-
- public void setMergeChain(KmerBytesWritable mergeChain) {
- this.mergeChain.set(mergeChain);
- }
-
public KmerBytesWritable getKmer() {
- return mergeChain;
+ return kmer;
}
- public void setKmer(KmerBytesWritable mergeChain) {
- this.mergeChain.set(mergeChain);
+ public void setKmer(KmerBytesWritable kmer) {
+ this.kmer.set(kmer);
}
public PositionWritable getMergeDest() {
@@ -139,7 +131,7 @@
incomingList.readFields(in);
outgoingList.readFields(in);
state = in.readByte();
- mergeChain.readFields(in);
+ kmer.readFields(in);
mergeDest.readFields(in);
}
@@ -148,7 +140,7 @@
incomingList.write(out);
outgoingList.write(out);
out.writeByte(state);
- mergeChain.write(out);
+ kmer.write(out);
mergeDest.write(out);
}
@@ -165,7 +157,7 @@
sbuilder.append(outgoingList.getReverseList().toString()).append('\t');
sbuilder.append(incomingList.getForwardList().toString()).append('\t');
sbuilder.append(incomingList.getReverseList().toString()).append('\t');
- sbuilder.append(mergeChain.toString()).append(')');
+ sbuilder.append(kmer.toString()).append(')');
return sbuilder.toString();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 8525a0a..ca42f4e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -89,7 +89,7 @@
byte[] array = { 'T', 'A', 'G', 'C', 'C', 'T'};
KmerBytesWritable kmer = new KmerBytesWritable(array.length);
kmer.setByRead(array, 0);
- vertexValue.setMergeChain(kmer);
+ vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)2));
vertexValue.setRRList(plist);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index d05a00b..cf312db 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -248,7 +248,7 @@
i++;
}
if(receivedMsgList.size() == 2){
- if(getVertexValue().getLengthOfMergeChain() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length){
broadcaseKillself();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index 63ac5b5..940d149 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -85,7 +85,7 @@
byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
KmerBytesWritable kmer = new KmerBytesWritable(array.length);
kmer.setByRead(array, 0);
- vertexValue.setMergeChain(kmer);
+ vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)2));
vertexValue.setRRList(plist);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 6b7bc5d..cd449b5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -194,7 +194,7 @@
outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -217,7 +217,7 @@
for(PositionWritable prevId : receivedMsgMap.keySet()){
receivedMsgList = receivedMsgMap.get(prevId);
if(receivedMsgList.size() > 1){
- //find the node with largest length of mergeChain
+ //find the node with largest length of Kmer
boolean flag = true; //the same length
int maxLength = receivedMsgList.get(0).getLengthOfChain();
PositionWritable max = receivedMsgList.get(0).getSourceVertexId();
@@ -279,7 +279,7 @@
broadcaseKillself();
} else if (incomingMsg.getMessage() == AdjMessage.MERGE){
//merge with small node
- getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
incomingMsg.getChainVertexId()));
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
new file mode 100644
index 0000000..61bb08f
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -0,0 +1,420 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BasicPathMergeVertex extends
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BasicPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "BasicPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ protected int maxIteration = -1;
+
+ protected MessageWritable incomingMsg = new MessageWritable();
+ protected MessageWritable outgoingMsg = new MessageWritable();
+ protected PositionWritable destVertexId = new PositionWritable();
+ protected Iterator<PositionWritable> posIterator;
+ protected byte outFlag;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ outFlag = (byte)0;
+ outgoingMsg.reset();
+ }
+
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ outFlag |= MessageFlag.DIR_FF;
+ }
+ else{ // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ outFlag |= MessageFlag.DIR_FR;
+ }
+ return posIterator.next();
+ }
+
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ if(value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ outFlag |= MessageFlag.DIR_RF;
+ }
+ else{ // #RRList() > 0
+ posIterator = value.getRRList().iterator();
+ outFlag |= MessageFlag.DIR_RR;
+ }
+ return posIterator.next();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * one vertex send message to previous and next vertices (neighbor)
+ */
+ public void sendMsgToAllNeighborNodes(VertexValueWritable value){
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg() {
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsgToAllNextNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsgToAllPreviousNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ }
+
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<MessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+ msgIterator.next();
+ voteToHalt();
+ } else {
+ incomingMsg = msgIterator.next();
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ }
+ }
+ }
+
+ /**
+ * check if A need to be flipped with successor
+ */
+ public boolean ifFilpWithSuccessor(){
+ if(getVertexValue().getFRList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * check if A need to be filpped with predecessor
+ */
+ public boolean ifFlipWithPredecessor(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * set adjMessage to successor(from predecessor)
+ */
+ public void setSuccessorAdjMsg(){
+ if(getVertexValue().getFFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_FF;
+ else
+ outFlag |= MessageFlag.DIR_FR;
+ }
+
+ /**
+ * set adjMessage to predecessor(from successor)
+ */
+ public void setPredecessorAdjMsg(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_RF;
+ else
+ outFlag |= MessageFlag.DIR_RF;
+ }
+
+ /**
+ * send update message to neighber
+ * @throws IOException
+ */
+ public void broadcastUpdateMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ outFlag |= MessageFlag.IS_HEAD;
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * send merge message to neighber for P2
+ * @throws IOException
+ */
+ public void sendMergeMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0){
+ byte newState = getVertexValue().getState();
+ newState &= ~MessageFlag.IS_HEAD;
+ newState |= MessageFlag.IS_OLDHEAD;
+ getVertexValue().setState(newState);
+ outFlag |= MessageFlag.IS_HEAD;
+ }
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * send merge message to neighber for P4
+ * @throws IOException
+ */
+ public void broadcastMergeMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ outFlag |= MessageFlag.IS_HEAD;
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromPredecessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ if(getVertexValue().getFFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromSuccessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
+ if(getVertexValue().getRFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /**
+ * check if need filp
+ */
+ public byte flipDirection(byte neighborDir, boolean flip){
+ if(flip){
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+ } else
+ return neighborDir;
+ }
+
+ /**
+ * updateAdjList
+ */
+ public void processUpdate(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
+ }
+
+ /**
+ * merge and updateAdjList
+ */
+ public void processMerge(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
+ kmerSize, incomingMsg.getChainVertexId());
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 992dbaa..50354c7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -12,6 +12,7 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -46,20 +47,12 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
+ BasicPathMergeVertex {
private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ byte headFlag;
/**
* initiate kmerSize, maxIteration
*/
@@ -68,153 +61,42 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ headFlag = (byte)0;
outgoingMsg.reset();
}
/**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getFlag() == Message.START) {
- getVertexValue().setState(State.START_VERTEX);
- //getVertexValue().setMergeChain(null);
- } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(State.END_VERTEX);
- getVertexValue().setMergeChain(getVertexValue().getMergeChain());
- voteToHalt();
- } else
- voteToHalt();
- }
-
- /**
* head send message to path
*/
public void sendOutMsg() {
- if (getVertexValue().getState() == State.START_VERTEX) {
- outgoingMsg.setFlag(Message.START);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- } else if (getVertexValue().getState() != State.END_VERTEX) {
- outgoingMsg.setFlag(Message.NON);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- }
+ //send wantToMerge to next
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
+
+ ////send wantToMerge to prev
+ destVertexId.set(getPreDestVertexId(getVertexValue()));
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
}
/**
* head send message to path
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
- if (getSuperstep() == 3) {
- sendOutMsg();
+ if(getSuperstep() == 3){
+ headFlag = (byte)(getVertexValue().getState() | MessageFlag.IS_HEAD);
+ if(headFlag == 0)
+ sendOutMsg();
} else {
- if (msgIterator.hasNext()) {
+ while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (mergeChainVertex()) {
- if (incomingMsg.getFlag() == Message.END) {
- if (getVertexValue().getState() == State.START_VERTEX) {
- getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
- //System.out.println();
- } else
- getVertexValue().setState(State.END_VERTEX);
- } else
- sendOutMsg();
- }
+ processMerge();
+ headFlag = (byte)(incomingMsg.getFlag() | MessageFlag.IS_HEAD);
+ if(headFlag > 0)
+ getVertexValue().setState(MessageFlag.IS_HEAD);
}
}
}
@@ -223,20 +105,7 @@
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
- if (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setFlag(Message.END);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
-
- if (incomingMsg.getFlag() == Message.START)
- deleteVertex(getVertexId());
- } else {
- if (getVertexValue().getState() != State.START_VERTEX && getVertexValue().getState() != State.END_VERTEX)
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
- }
+ sendMergeMsg();
}
/**
@@ -246,10 +115,10 @@
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getChainVertexId()));
- KmerBytesWritable chainVertexId = kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer);
- getVertexValue().setMergeChain(chainVertexId);
+ KmerBytesWritable chainVertexId = kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer);
+ getVertexValue().setKmer(chainVertexId);
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- if (VertexUtil.isCycle(kmerFactory.getFirstKmerFromChain(kmerSize, getVertexValue().getMergeChain()),
+ if (VertexUtil.isCycle(kmerFactory.getFirstKmerFromChain(kmerSize, getVertexValue().getKmer()),
chainVertexId, kmerSize)) {
getVertexValue().setState(State.CYCLE);
return false;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index e9d359f..e8fb61a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -186,7 +186,7 @@
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getChainVertexId()));
- getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer));
+ getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -209,7 +209,7 @@
} else {
mergeChainVertex();
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
}
}
@@ -222,7 +222,7 @@
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setFlag(Message.STOP);
destVertexId.set(incomingMsg.getSourceVertexId());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index c9762f2..3e1b87e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -232,8 +232,8 @@
public void mergeChainVertex(){
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
incomingMsg.getChainVertexId()));
- getVertexValue().setMergeChain(
- kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ getVertexValue().setKmer(
+ kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -258,7 +258,7 @@
} else {
mergeChainVertex();
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
}
}
@@ -271,7 +271,7 @@
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
if (getVertexValue().getState() == State.END_VERTEX)
outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -305,7 +305,7 @@
if (getVertexValue().getState() == State.START_VERTEX
&& incomingMsg.getFlag() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
} else if(getVertexValue().getState() == State.PSEUDOHEAD
&& incomingMsg.getFlag() == Message.STOP)
@@ -325,7 +325,7 @@
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
if (getVertexValue().getState() == State.PSEUDOREAR)
outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State.END_VERTEX)
@@ -347,7 +347,7 @@
if (getVertexValue().getState() == State.START_VERTEX
&& incomingMsg.getFlag() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
} else if(getVertexValue().getState() == State.PSEUDOHEAD
&& incomingMsg.getFlag() == Message.STOP)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 39fe619..2457c1c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -15,7 +15,6 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -52,13 +51,9 @@
* Naive Algorithm for path merge graph
*/
public class P4ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P4ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P4ForPathMergeVertex.iteration";
+ BasicPathMergeVertex {
public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- public static int kmerSize = -1;
- private int maxIteration = -1;
private static long randSeed = -1;
private float probBeingRandomHead = -1;
@@ -74,14 +69,8 @@
private boolean prevHead;
private byte headFlag;
private byte tailFlag;
- private byte outFlag;
private byte selfFlag;
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
-
/**
* initiate kmerSize, maxIteration
*/
@@ -144,306 +133,6 @@
return false;
}
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag(MessageFlag.IS_HEAD);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- getVertexValue().setState(MessageFlag.IS_HEAD);
- }
- }
- }
-
- /**
- * check if A need to be flipped with successor
- */
- public boolean ifFilpWithSuccessor(){
- if(getVertexValue().getFRList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * check if A need to be filpped with predecessor
- */
- public boolean ifFlipWithPredecessor(){
- if(getVertexValue().getRFList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * set adjMessage to successor(from predecessor)
- */
- public void setSuccessorAdjMsg(){
- if(getVertexValue().getFFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_FF;
- else
- outFlag |= MessageFlag.DIR_FR;
- }
-
- /**
- * set adjMessage to predecessor(from successor)
- */
- public void setPredecessorAdjMsg(){
- if(getVertexValue().getRFList().getLength() > 0)
- outFlag |= MessageFlag.DIR_RF;
- else
- outFlag |= MessageFlag.DIR_RF;
- }
-
- /**
- * send update message to neighber
- * @throws IOException
- */
- public void broadcastUpdateMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
- outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & 0b0001){
- case MessageFlag.SHOULD_MERGEWITHPREV:
- setSuccessorAdjMsg();
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- break;
- case MessageFlag.SHOULD_MERGEWITHNEXT:
- setPredecessorAdjMsg();
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
- break;
- }
- }
-
- /**
- * send merge message to neighber
- * @throws IOException
- */
- public void broadcastMergeMsg(){
- if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
- outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & 0b0001){
- case MessageFlag.SHOULD_MERGEWITHNEXT:
- setSuccessorAdjMsg();
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- break;
- case MessageFlag.SHOULD_MERGEWITHPREV:
- setPredecessorAdjMsg();
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
- break;
- }
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromPredecessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHNEXT;
- getVertexValue().setState(state);
- if(getVertexValue().getFFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- * @throws IOException
- */
- public void sendUpMsgFromSuccessor(){
- byte state = getVertexValue().getState();
- state |= MessageFlag.SHOULD_MERGEWITHPREV;
- getVertexValue().setState(state);
- if(getVertexValue().getRFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * Returns the edge dir for B->A when the A->B edge is type @dir
- */
- public byte mirrorDirection(byte dir) {
- switch (dir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RF;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_FF;
- default:
- throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
- }
- }
-
- /**
- * check if need filp
- */
- public byte flipDirection(byte neighborDir, boolean flip){
- if(flip){
- switch (neighborDir) {
- case MessageFlag.DIR_FF:
- return MessageFlag.DIR_FR;
- case MessageFlag.DIR_FR:
- return MessageFlag.DIR_FF;
- case MessageFlag.DIR_RF:
- return MessageFlag.DIR_RR;
- case MessageFlag.DIR_RR:
- return MessageFlag.DIR_RF;
- default:
- throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
- }
- } else
- return neighborDir;
- }
-
- /**
- * updateAdjList
- */
- public void processUpdate(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
- }
-
- /**
- * merge and updateAdjList
- */
- public void processMerge(){
- byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-
- boolean flip;
- if((outFlag & MessageFlag.FLIP) > 0)
- flip = true;
- else
- flip = false;
- byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
-
- getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
- neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getChainVertexId());
- }
-
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -466,10 +155,6 @@
// We prevent merging towards non-path nodes
hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- getVertexValue().setState(outFlag);
- voteToHalt();
- }
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead) {
@@ -518,7 +203,8 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
processMerge();
-
+
+ //head meets head, stop
headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
if((headFlag & selfFlag) > 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 75d53e9..18e7d23 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -244,7 +244,7 @@
getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
} else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(MessageFlag.IS_TAIL);
- getVertexValue().setMergeChain(getVertexValue().getMergeChain());
+ getVertexValue().setKmer(getVertexValue().getKmer());
//voteToHalt();
} //else
//voteToHalt();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 59a5efb..668633c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -85,7 +85,7 @@
byte[] array = { 'G', 'A', 'A'};
KmerBytesWritable kmer = new KmerBytesWritable(array.length);
kmer.setByRead(array, 0);
- vertexValue.setMergeChain(kmer);
+ vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)4));
vertexValue.setRRList(plist);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 62f941f..dec2d48 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -95,7 +95,7 @@
initVertex();
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length){
if(getVertexValue().getFFList().getCountOfPosition() > 0)
outgoingMsg.setFlag(AdjMessage.FROMFF);
else if(getVertexValue().getFRList().getCountOfPosition() > 0)
@@ -107,7 +107,7 @@
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length){
if(getVertexValue().getRFList().getCountOfPosition() > 0)
outgoingMsg.setFlag(AdjMessage.FROMRF);
else if(getVertexValue().getRRList().getCountOfPosition() > 0)
@@ -119,7 +119,7 @@
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() > length)
+ if(getVertexValue().getLengthOfKmer() > length)
deleteVertex(getVertexId());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
index 8669ecd..9c20ce6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -38,7 +38,7 @@
outputValue.setFRList(node.getFRList());
outputValue.setRFList(node.getRFList());
outputValue.setRRList(node.getRRList());
- outputValue.setMergeChain(node.getKmer());
+ outputValue.setKmer(node.getKmer());
outputValue.setState(State.NON_VERTEX);
writer.append(outputKey, outputValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 9dd5162..f31c1d0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -60,7 +60,7 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() != -1 && value.getLengthOfMergeChain() <= maxLength) {
+ if (value.getLengthOfKmer() != -1 && value.getLengthOfKmer() <= maxLength) {
bw.write(value.toString());
bw.newLine();
}
@@ -84,7 +84,7 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() != -1 && value.getLengthOfMergeChain() <= maxLength
+ if (value.getLengthOfKmer() != -1 && value.getLengthOfKmer() <= maxLength
&& value.getState() == State.FINAL_VERTEX) {
bw.write(key.toString() + "\t" + value.toString());
bw.newLine();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 90100e9..83614e7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -1,25 +1,21 @@
package edu.uci.ics.genomix.pregelix.type;
-import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
+import edu.uci.ics.genomix.type.NodeWritable.MergeDirFlag;
-public class MessageFlag extends DirectionFlag {
- public static final byte FLIP = 1 << 2;
- public static final byte IS_HEAD = 1 << 3;
- public static final byte IS_TAIL = 1 << 4;
- public static final byte SHOULD_MERGEWITHNEXT = 1 << 5;
- public static final byte SHOULD_MERGEWITHPREV = 1 << 6;
+public class MessageFlag extends MergeDirFlag{
+
+ public static final byte FLIP = 1 << 3;
+ public static final byte IS_HEAD = 1 << 4;
+ public static final byte IS_OLDHEAD = 1 << 5;
+ public static final byte IS_TAIL = 1 << 5;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
switch (code) {
case IS_HEAD:
return "IS_HEAD";
- case IS_TAIL:
- return "IS_TAIL";
- case SHOULD_MERGEWITHNEXT:
- return "SHOULD_MERGEWITHNEXT";
- case SHOULD_MERGEWITHPREV:
- return "SHOULD_MERGEWITHPREV";
+ case IS_OLDHEAD:
+ return "IS_OLDHEAD";
case FLIP:
return "FLIP";
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index a4aaa84..7e56c65 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -16,21 +16,19 @@
}
/**
- * Head Vertex: out-degree > 0,
+ * Head Vertex: out-degree > 0
+ */
+ public static boolean isHead(VertexValueWritable value){
+ return value.outDegree() > 0 && !isPathVertex(value);
+ }
+
+ /**
+ * Head Vertex: out-degree > 0, and has indegress
*
* @param vertexValue
*/
public static boolean isHeadVertexWithIndegree(VertexValueWritable value) {
- return value.outDegree() > 0 && !isPathVertex(value) && !isHeadWithoutIndegree(value);
- }
-
- /**
- * Rear Vertex: in-degree > 0,
- *
- * @param vertexValue
- */
- public static boolean isRearVertexWithOutdegree(VertexValueWritable value) {
- return value.inDegree() > 0 && !isPathVertex(value) && !isRearWithoutOutdegree(value);
+ return isHead(value) && !isHeadWithoutIndegree(value);
}
/**
@@ -41,6 +39,23 @@
}
/**
+ * Head Vertex: out-degree > 0
+ */
+ public static boolean isRear(VertexValueWritable value){
+ return value.inDegree() > 0 && !isPathVertex(value);
+ }
+
+ /**
+ * Rear Vertex: in-degree > 0, and has outdegree
+ *
+ * @param vertexValue
+ */
+ public static boolean isRearVertexWithOutdegree(VertexValueWritable value) {
+ return isRear(value) && !isRearWithoutOutdegree(value);
+ }
+
+
+ /**
* Rear Vertex without outdegree: indegree = 1, outdegree = 0
*/
public static boolean isRearWithoutOutdegree(VertexValueWritable value){