start coding SplitRepeat
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index ba374be..7fe27e6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -9,6 +9,7 @@
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
public class MessageWritable implements WritableComparable<MessageWritable> {
/**
@@ -19,6 +20,7 @@
private KmerBytesWritable sourceVertexId;
private KmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
+ private PositionListWritable nodeIdList = new PositionListWritable();
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
@@ -158,6 +160,14 @@
this.updateMsg = updateMsg;
}
+ public PositionListWritable getNodeIdList() {
+ return nodeIdList;
+ }
+
+ public void setNodeIdList(PositionListWritable nodeIdList) {
+ this.nodeIdList.set(nodeIdList);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
@@ -168,6 +178,8 @@
kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
+ if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
+ nodeIdList.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
out.writeBoolean(updateMsg);
@@ -184,6 +196,8 @@
kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
+ if ((checkMessage & CheckMessage.NODEIDLIST) != 0)
+ nodeIdList.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
updateMsg = in.readBoolean();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index f67cac6..c6ff206 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -244,6 +244,10 @@
return outgoingList.getForwardList().getCountOfPosition() + outgoingList.getReverseList().getCountOfPosition();
}
+ public int getDegree(){
+ return inDegree() + outDegree();
+ }
+
/*
* Delete the corresponding edge
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 8bddd6b..ca91edb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -163,23 +163,25 @@
* one vertex send message to previous and next vertices (neighbor)
*/
public void sendMsgToAllNeighborNodes(VertexValueWritable value){
- posIterator = value.getFFList().iterator(); // FFList
+ sendMsgToAllNextNodes(value);
+ sendMsgToAllPreviousNodes(value);
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
+ outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
- posIterator = value.getFRList().iterator(); // FRList
+ posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
+ outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
@@ -204,26 +206,6 @@
sendMsg(destVertexId, outgoingMsg);
}
}
-
- /**
- * head send message to all previous nodes
- */
- public void sendSettledMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRF);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- outgoingMsg.setFlag(AdjMessage.FROMRR);
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
/**
* start sending message
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
new file mode 100644
index 0000000..52db6b4
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.genomix.pregelix.operator.splitrepeat;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+
+public class SplitRepeatVertex extends
+ BasicGraphCleanVertex{
+
+ Map<KmerBytesWritable, PositionListWritable> kmerMap = new HashMap<KmerBytesWritable, PositionListWritable>();
+ String[][] connectedTable = new String[][]{
+ {"FF", "RF"},
+ {"FF", "RR"},
+ {"FR", "RF"},
+ {"FR", "RR"}
+ };
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ if(getSuperstep() == 1){
+ if(getVertexValue().getDegree() > 2){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsgToAllNeighborNodes(getVertexValue());
+ }
+ voteToHalt();
+ } else if(getSuperstep() == 2){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ outgoingMsg.setNodeIdList(getVertexValue().getNodeIdList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
+ voteToHalt();
+ } else if(getSuperstep() == 3){
+ kmerMap.clear();
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ kmerMap.put(incomingMsg.getSourceVertexId(), incomingMsg.getNodeIdList());
+ }
+ /** process connectedTable **/
+ for(int i = 0; i < 4; i++){
+ getVertexValue().getFFList();
+ }
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 4aee8a0..73ab533 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -6,7 +6,7 @@
public static final byte CHAIN = 1 << 1;
public static final byte NEIGHBER = 1 << 2;
public static final byte MESSAGE = 1 << 3;
- public static final byte STATE = 1 << 4;
+ public static final byte NODEIDLIST = 1 << 4;
public static final byte ADJMSG = 1 << 5;
public static final byte START = 1 << 6;
@@ -27,8 +27,8 @@
case MESSAGE:
r = "MESSAGE";
break;
- case STATE:
- r = "STATE";
+ case NODEIDLIST:
+ r = "READID";
break;
case ADJMSG:
r = "ADJMSG";