update data clean 2013-06-06
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
new file mode 100644
index 0000000..55ddb1b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -0,0 +1,185 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+public class MergeBubbleMessageWritable implements WritableComparable<MergeBubbleMessageWritable> {
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private PositionWritable sourceVertexId;
+ private KmerBytesWritable chainVertexId;
+ private AdjacencyListWritable neighberNode; //incoming or outgoing
+ private byte message;
+ private PositionWritable startVertexId;
+
+ private byte checkMessage;
+
+ public MergeBubbleMessageWritable() {
+ sourceVertexId = new PositionWritable();
+ chainVertexId = new KmerBytesWritable(0);
+ neighberNode = new AdjacencyListWritable();
+ message = Message.NON;
+ startVertexId = new PositionWritable();
+ checkMessage = (byte) 0;
+ }
+
+ public void set(MessageWritable msg) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(msg.getSourceVertexId());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(msg.getChainVertexId());
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(msg.getNeighberNode());
+ }
+ this.message = msg.getMessage();
+ }
+
+ public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ this.message = message;
+ }
+
+ public void reset() {
+ checkMessage = 0;
+ chainVertexId.reset(1);
+ neighberNode.reset();
+ message = Message.NON;
+ }
+
+ public PositionWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(PositionWritable sourceVertexId) {
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId.getReadID(),sourceVertexId.getPosInRead());
+ }
+ }
+
+ public KmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(KmerBytesWritable chainVertexId) {
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ }
+
+ public AdjacencyListWritable getNeighberNode() {
+ return neighberNode;
+ }
+
+ public void setNeighberNode(AdjacencyListWritable neighberNode) {
+ if(neighberNode != null){
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(neighberNode);
+ }
+ }
+
+ public int getLengthOfChain() {
+ return chainVertexId.getKmerLength();
+ }
+
+ public PositionWritable getStartVertexId() {
+ return startVertexId;
+ }
+
+ public void setStartVertexId(PositionWritable startVertexId) {
+ if(startVertexId != null){
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(startVertexId);
+ }
+ }
+
+ public byte getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(checkMessage);
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.write(out);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.write(out);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.write(out);
+ out.write(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.reset();
+ checkMessage = in.readByte();
+ if ((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.readFields(in);
+ if ((checkMessage & CheckMessage.NEIGHBER) != 0)
+ neighberNode.readFields(in);
+ if ((checkMessage & CheckMessage.START) != 0)
+ startVertexId.readFields(in);
+ message = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof MergeBubbleMessageWritable) {
+ MergeBubbleMessageWritable tp = (MergeBubbleMessageWritable) o;
+ return sourceVertexId.equals(tp.sourceVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return sourceVertexId.toString();
+ }
+
+ public int compareTo(MergeBubbleMessageWritable tp) {
+ return sourceVertexId.compareTo(tp.sourceVertexId);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index f0a6b58..6a71344 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -31,6 +31,23 @@
message = Message.NON;
checkMessage = (byte) 0;
}
+
+ public void set(MessageWritable msg) {
+ checkMessage = 0;
+ if (sourceVertexId != null) {
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(msg.getSourceVertexId());
+ }
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(msg.getChainVertexId());
+ }
+ if (neighberNode != null) {
+ checkMessage |= CheckMessage.NEIGHBER;
+ this.neighberNode.set(msg.getNeighberNode());
+ }
+ this.message = msg.getMessage();
+ }
public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
checkMessage = 0;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 9a46a67..8716d8d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.pregelix.operator.bridgeremove;
+import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
@@ -11,6 +12,8 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
* vertexId: BytesWritable
@@ -46,31 +49,137 @@
public class BridgeRemoveVertex extends
Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
- public static final String ITERATIONS = "BridgeRemoveVertex.iteration";
+ public static final String LENGTH = "BridgeRemoveVertex.length";
public static int kmerSize = -1;
- private int maxIteration = -1;
+ private int length = -1;
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
-
+ private ArrayList<MessageWritable> receivedMsg = new ArrayList<MessageWritable>();
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
/**
* initiate kmerSize, maxIteration
*/
public void initVertex() {
if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(length == -1)
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
outgoingMsg.reset();
+ receivedMsg.clear();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(ValueStateWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
}
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
-
- }
+ if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+ sendMsgToAllNextNodes(getVertexValue());
+ }
+ else if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+ sendMsgToAllPreviousNodes(getVertexValue());
+ }
+ }
+ else if (getSuperstep() == 2){
+ int i = 0;
+ while (msgIterator.hasNext()) {
+ if(i == 3)
+ break;
+ receivedMsg.add(msgIterator.next());
+ i++;
+ }
+ if(receivedMsg.size() == 2){
+ if(getVertexValue().getLengthOfMergeChain() > length){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ if(receivedMsg.get(0).getMessage() == AdjMessage.FROMFF
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR){
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFF
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR
+ && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ }
+ }
+ }
+ }
+ else if(getSuperstep() == 3){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ //remove incomingMsg.getSourceId from RR positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ //remove incomingMsg.getSourceId from RF positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ //remove incomingMsg.getSourceId from FR positionList
+ } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ //remove incomingMsg.getSourceId from FF positionList
+ }
+ }
+ }
+ voteToHalt();
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index a9c9c5b..e16cd20 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -1,6 +1,10 @@
package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -9,8 +13,10 @@
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.MergeBubbleMessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
* vertexId: BytesWritable
@@ -44,15 +50,19 @@
* Naive Algorithm for path merge graph
*/
public class BubbleMergeVertex extends
- Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MergeBubbleMessageWritable> {
public static final String KMER_SIZE = "BubbleMergeVertex.kmerSize";
public static final String ITERATIONS = "BubbleMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
+ private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
+ private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+ private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsg = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
+ private ArrayList<MergeBubbleMessageWritable> tmpMsg = new ArrayList<MergeBubbleMessageWritable>();
/**
* initiate kmerSize, maxIteration
@@ -64,13 +74,121 @@
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
outgoingMsg.reset();
}
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ return posIterator.next();
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(ValueStateWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
@Override
- public void compute(Iterator<MessageWritable> msgIterator) {
+ public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
-
- }
+ if(VertexUtil.isHeadVertex(getVertexValue())){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsgToAllNextNodes(getVertexValue());
+ }
+ } else if (getSuperstep() == 2){
+ while (msgIterator.hasNext()) {
+ if(VertexUtil.isPathVertex(getVertexValue())){
+ outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ } else if (getSuperstep() == 3){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(!receivedMsg.containsKey(incomingMsg.getStartVertexId())){
+ tmpMsg.clear();
+ tmpMsg.add(incomingMsg);
+ receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+ }
+ else{
+ tmpMsg.clear();
+ tmpMsg.addAll(receivedMsg.get(incomingMsg.getStartVertexId()));
+ tmpMsg.add(incomingMsg);
+ receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+ }
+ }
+ for(PositionWritable prevId : receivedMsg.keySet()){
+ tmpMsg = receivedMsg.get(prevId);
+ if(tmpMsg.size() > 1){
+ //find the node with largest length of mergeChain
+ boolean flag = true; //the same length
+ int maxLength = tmpMsg.get(0).getLengthOfChain();
+ PositionWritable max = tmpMsg.get(0).getSourceVertexId();
+ for(int i = 1; i < tmpMsg.size(); i++){
+ if(tmpMsg.get(i).getLengthOfChain() != maxLength)
+ flag = false;
+ if(tmpMsg.get(i).getLengthOfChain() > maxLength){
+ maxLength = tmpMsg.get(i).getLengthOfChain();
+ max = tmpMsg.get(i).getSourceVertexId();
+ }
+ }
+ //send merge or unchange Message to node with largest length
+ if(flag == true){
+ //send unchange Message to node with largest length
+ //we can send no message to complete this step
+ //send delete Message to node which doesn't have largest length
+ for(int i = 0; i < tmpMsg.size(); i++){
+ if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+ outgoingMsg.setMessage(AdjMessage.KILL);
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ } else {
+ outgoingMsg.setMessage(AdjMessage.UNCHANGE);
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ }
+ }
+ } else{
+ //send merge Message to node with largest length
+ for(int i = 0; i < tmpMsg.size(); i++){
+ if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+ outgoingMsg.setMessage(AdjMessage.KILL);
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ } else {
+ outgoingMsg.setMessage(AdjMessage.MERGE);
+ /* add other node in message */
+ sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ }
+ }
+ }
+ }
+ }
+ } else if (getSuperstep() == 4){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.KILL){
+ deleteVertex(getVertexId());
+ } else if (incomingMsg.getMessage() == AdjMessage.MERGE){
+ //merge with small node
+ }
+ }
+ }
+ voteToHalt();
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 534da5c..4a174ec 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -11,7 +11,7 @@
import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
@@ -68,26 +68,46 @@
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
- initVertex(); //getVertexValue().getLengthOfMergeChain() < length
+ initVertex();
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
if(getVertexValue().getLengthOfMergeChain() > length){
- if(getVertexValue().getOutgoingList().getCountOfPosition() != 0){
- if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(Message.TOFORWARD);
- else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(Message.TOREVERSE);
- outgoingMsg.setSourceVertexId(getVertexId());
- }
+ if(getVertexValue().getFFList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ deleteVertex(getVertexId());
}
-
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
-
+ if(getVertexValue().getLengthOfMergeChain() > length){
+ if(getVertexValue().getRFList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0)
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ deleteVertex(getVertexId());
+ }
+ }
+ else if(VertexUtil.isSingleVertex(getVertexValue())){
+ if(getVertexValue().getLengthOfMergeChain() > length)
+ deleteVertex(getVertexId());
}
}
else if(getSuperstep() == 2){
-
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ //remove incomingMsg.getSourceId from RR positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ //remove incomingMsg.getSourceId from RF positionList
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ //remove incomingMsg.getSourceId from FR positionList
+ } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ //remove incomingMsg.getSourceId from FF positionList
+ }
+ }
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
new file mode 100644
index 0000000..ca8d795
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class AdjMessage {
+ public static final byte FROMFF = 0;
+ public static final byte FROMFR = 1;
+ public static final byte FROMRF = 2;
+ public static final byte FROMRR = 3;
+ public static final byte NON = 4;
+ public static final byte UNCHANGE = 5;
+ public static final byte MERGE = 6;
+ public static final byte KILL = 7;
+
+ public final static class ADJMESSAGE_CONTENT {
+ public static String getContentFromCode(byte code) {
+ String r = "";
+ switch (code) {
+ case FROMFF:
+ r = "FROMFF";
+ break;
+ case FROMFR:
+ r = "FROMFR";
+ break;
+ case FROMRF:
+ r = "FROMRF";
+ break;
+ case FROMRR:
+ r = "FROMRR";
+ break;
+ case NON:
+ r = "NON";
+ break;
+ case UNCHANGE:
+ r = "UNCHANGE";
+ break;
+ case MERGE:
+ r = "MERGE";
+ break;
+ case KILL:
+ r = "KILL";
+ break;
+ }
+ return r;
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 6e6a97a..c7bcf48 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -8,6 +8,7 @@
public static final byte MESSAGE = 1 << 3;
public static final byte STATE = 1 << 4;
public static final byte LASTGENECODE = 1 << 5;
+ public static final byte START = 1 << 6;
public final static class CheckMessage_CONTENT {
@@ -32,6 +33,9 @@
case LASTGENECODE:
r = "LASTGENECODE";
break;
+ case START:
+ r = "START";
+ break;
}
return r;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index b97f0bb..4644383 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -8,8 +8,8 @@
public static final byte STOP = 3;
public static final byte FROMPSEUDOHEAD = 4;
public static final byte FROMPSEUDOREAR = 5;
- public static final byte TOFORWARD = 6;
- public static final byte TOREVERSE = 7;
+ public static final byte IN = 6;
+ public static final byte OUT = 7;
public final static class MESSAGE_CONTENT {
@@ -34,11 +34,11 @@
case FROMPSEUDOREAR:
r = "FROMPSEUDOREAR";
break;
- case TOFORWARD:
- r = "TOFORWARD";
+ case IN:
+ r = "IN";
break;
- case TOREVERSE:
- r = "TOREVERSE";
+ case OUT:
+ r = "OUT";
break;
}
return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 19839c7..772690d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -58,4 +58,25 @@
public static boolean isOutgoingTipVertex(ValueStateWritable value){
return value.inDegree() == 1 && value.outDegree() == 0;
}
+
+ /**
+ * check if vertex is single
+ */
+ public static boolean isSingleVertex(ValueStateWritable value){
+ return value.inDegree() == 0 && value.outDegree() == 0;
+ }
+
+ /**
+ * check if vertex is upbridge
+ */
+ public static boolean isUpBridgeVertex(ValueStateWritable value){
+ return value.inDegree() == 1 && value.outDegree() > 1;
+ }
+
+ /**
+ * check if vertex is downbridge
+ */
+ public static boolean isDownBridgeVertex(ValueStateWritable value){
+ return value.inDegree() > 1 && value.outDegree() == 1;
+ }
}