add test cases for removeBridge, removeTip and mergeBubble
diff --git a/genomix/genomix-pregelix/data/TipAddGraph/txt/test b/genomix/genomix-pregelix/data/TipAddGraph/txt/test
deleted file mode 100755
index 5c36d0a..0000000
--- a/genomix/genomix-pregelix/data/TipAddGraph/txt/test
+++ /dev/null
@@ -1,7 +0,0 @@
-(1,5) ([(1,6)] [] [] [(1,4)] GAA)
-(2,1) ([] [] [] [(1,4)] )
-(1,1) ([(1,2)] [] [] [] AAT)
-(1,2) ([(1,3)] [] [] [(1,1)] ATA)
-(1,4) ([(1,5),(2,1)] [] [] [(1,3)] AGA)
-(1,3) ([(1,4)] [] [] [(1,2)] TAG)
-(1,6) ([] [] [] [(1,5)] AAC)
diff --git a/genomix/genomix-pregelix/data/TipAddGraph/txt/.test.crc b/genomix/genomix-pregelix/data/bridgeremove/txt/.bridgeremove.crc
similarity index 100%
copy from genomix/genomix-pregelix/data/TipAddGraph/txt/.test.crc
copy to genomix/genomix-pregelix/data/bridgeremove/txt/.bridgeremove.crc
diff --git a/genomix/genomix-pregelix/data/bridgeremove/txt/bridgeremove b/genomix/genomix-pregelix/data/bridgeremove/txt/bridgeremove
new file mode 100755
index 0000000..e5fb406
--- /dev/null
+++ b/genomix/genomix-pregelix/data/bridgeremove/txt/bridgeremove
@@ -0,0 +1,7 @@
+(2,1)([(2,2)] [] [] [] GCC)
+(1,1)([(1,2)] [] [] [] AAT)
+(2,2)([(2,3)] [] [] [(2,1),(3,1)] CCT)
+(1,2)([(1,3),(3,1)] [] [] [(1,1)] ATA)
+(2,3)([] [] [] [(2,2)] CTAGCA)
+(1,3)([] [] [] [(1,2)] TAGAAC)
+(3,1)([(2,2)] [] [] [(1,2)] TAGCCT)
diff --git a/genomix/genomix-pregelix/data/TipAddGraph/txt/.test.crc b/genomix/genomix-pregelix/data/bubble_merge/txt/.bubble_merge.crc
similarity index 100%
copy from genomix/genomix-pregelix/data/TipAddGraph/txt/.test.crc
copy to genomix/genomix-pregelix/data/bubble_merge/txt/.bubble_merge.crc
diff --git a/genomix/genomix-pregelix/data/bubble_merge/txt/bubble_merge b/genomix/genomix-pregelix/data/bubble_merge/txt/bubble_merge
new file mode 100755
index 0000000..22d58bf
--- /dev/null
+++ b/genomix/genomix-pregelix/data/bubble_merge/txt/bubble_merge
@@ -0,0 +1,6 @@
+(1,5)([] [] [] [(1,4)] GAAC)
+(2,1)([(1,4)] [] [] [(1,2)] TAGCCAG)
+(1,1)([(1,2)] [] [] [] AAT)
+(1,2)([(1,3),(2,1)] [] [] [(1,1)] ATA)
+(1,4)([(1,5)] [] [] [(1,3),(2,1)] AGA)
+(1,3)([(1,4)] [] [] [(1,2)] TAG)
diff --git a/genomix/genomix-pregelix/data/TipAddGraph/txt/.test.crc b/genomix/genomix-pregelix/data/bubble_remove/txt/.bubble.crc
similarity index 100%
rename from genomix/genomix-pregelix/data/TipAddGraph/txt/.test.crc
rename to genomix/genomix-pregelix/data/bubble_remove/txt/.bubble.crc
diff --git a/genomix/genomix-pregelix/data/bubble_remove/txt/bubble b/genomix/genomix-pregelix/data/bubble_remove/txt/bubble
new file mode 100755
index 0000000..d250792
--- /dev/null
+++ b/genomix/genomix-pregelix/data/bubble_remove/txt/bubble
@@ -0,0 +1,6 @@
+(1,5)([] [] [] [(1,4)] GAAC)
+(2,1)([(1,4)] [] [] [(1,2)] TAG)
+(1,1)([(1,2)] [] [] [] AAT)
+(1,2)([(1,3),(2,1)] [] [] [(1,1)] ATA)
+(1,4)([(1,5)] [] [] [(1,3),(2,1)] AGA)
+(1,3)([(1,4)] [] [] [(1,2)] TAG)
diff --git a/genomix/genomix-pregelix/data/input/read/part-0 b/genomix/genomix-pregelix/data/input/read/part-0
index 6595dfe..d702695 100755
--- a/genomix/genomix-pregelix/data/input/read/part-0
+++ b/genomix/genomix-pregelix/data/input/read/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read/part-1 b/genomix/genomix-pregelix/data/input/read/part-1
index 072d5f9..5bc30e8 100755
--- a/genomix/genomix-pregelix/data/input/read/part-1
+++ b/genomix/genomix-pregelix/data/input/read/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read/part-2 b/genomix/genomix-pregelix/data/input/read/part-2
index 3d132ac..474af09 100755
--- a/genomix/genomix-pregelix/data/input/read/part-2
+++ b/genomix/genomix-pregelix/data/input/read/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/read/part-3 b/genomix/genomix-pregelix/data/input/read/part-3
index df12383..98326a6 100755
--- a/genomix/genomix-pregelix/data/input/read/part-3
+++ b/genomix/genomix-pregelix/data/input/read/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-0 b/genomix/genomix-pregelix/data/input/tipremove/part-0
index a10ab8b..d45f051 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-0
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-1 b/genomix/genomix-pregelix/data/input/tipremove/part-1
index 20a3078..d50e98a 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-1
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-2 b/genomix/genomix-pregelix/data/input/tipremove/part-2
index aed9265..6ed28d6 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-2
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-3 b/genomix/genomix-pregelix/data/input/tipremove/part-3
index 8af7223..4734cf2 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-3
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-4 b/genomix/genomix-pregelix/data/input/tipremove/part-4
index 2fd77a1..84cd77d 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-4
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-4
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-5 b/genomix/genomix-pregelix/data/input/tipremove/part-5
index 7eb1649..9143122 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-5
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-5
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-6 b/genomix/genomix-pregelix/data/input/tipremove/part-6
index 8ae2314..14c675c 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-6
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-6
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/tipremove/part-7 b/genomix/genomix-pregelix/data/input/tipremove/part-7
index d2b9342..0c9ca53 100755
--- a/genomix/genomix-pregelix/data/input/tipremove/part-7
+++ b/genomix/genomix-pregelix/data/input/tipremove/part-7
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/unmerge_read/part-0 b/genomix/genomix-pregelix/data/input/unmerge_read/part-0
new file mode 100755
index 0000000..aab2f64
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/unmerge_read/part-0
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/unmerge_read/part-1 b/genomix/genomix-pregelix/data/input/unmerge_read/part-1
new file mode 100755
index 0000000..d7b24b3
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/unmerge_read/part-1
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/unmerge_read/part-2 b/genomix/genomix-pregelix/data/input/unmerge_read/part-2
new file mode 100755
index 0000000..f82775c
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/unmerge_read/part-2
Binary files differ
diff --git a/genomix/genomix-pregelix/data/input/unmerge_read/part-3 b/genomix/genomix-pregelix/data/input/unmerge_read/part-3
new file mode 100755
index 0000000..b7a3925
--- /dev/null
+++ b/genomix/genomix-pregelix/data/input/unmerge_read/part-3
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
index 55ddb1b..63066fa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MergeBubbleMessageWritable.java
@@ -29,12 +29,12 @@
sourceVertexId = new PositionWritable();
chainVertexId = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
- message = Message.NON;
startVertexId = new PositionWritable();
+ message = Message.NON;
checkMessage = (byte) 0;
}
- public void set(MessageWritable msg) {
+ public void set(MergeBubbleMessageWritable msg) {
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
@@ -48,10 +48,14 @@
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(msg.getNeighberNode());
}
+ if (startVertexId != null) {
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(msg.getStartVertexId());
+ }
this.message = msg.getMessage();
}
- public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
+ public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, PositionWritable startVertexId, byte message) {
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
@@ -65,6 +69,10 @@
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(neighberNode);
}
+ if (startVertexId != null) {
+ checkMessage |= CheckMessage.START;
+ this.startVertexId.set(startVertexId);
+ }
this.message = message;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
new file mode 100644
index 0000000..6c24aa9
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -0,0 +1,122 @@
+package edu.uci.ics.genomix.pregelix.operator.bridgeremove;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BridgeAddVertex extends
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BridgeRemoveVertex.kmerSize";
+ public static final String LENGTH = "BridgeRemoveVertex.length";
+ public static int kmerSize = -1;
+ private int length = -1;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if(length == -1)
+ length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if(getSuperstep() == 1){
+ if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
+ getVertexValue().getFFList().append(3, (byte)1);
+
+ //add bridge vertex
+ @SuppressWarnings("rawtypes")
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ PositionWritable vertexId = new PositionWritable();
+ ValueStateWritable vertexValue = new ValueStateWritable();
+ /**
+ * set the src vertex id
+ */
+ vertexId.set(3, (byte)1);
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ byte[] array = { 'T', 'A', 'G', 'C', 'C', 'T'};
+ KmerBytesWritable kmer = new KmerBytesWritable(array.length);
+ kmer.setByRead(array, 0);
+ vertexValue.setMergeChain(kmer);
+ PositionListWritable plist = new PositionListWritable();
+ plist.append(new PositionWritable(1, (byte)2));
+ vertexValue.setRRList(plist);
+ PositionListWritable plist2 = new PositionListWritable();
+ plist2.append(new PositionWritable(2, (byte)2));
+ vertexValue.setFFList(plist2);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+ if(getVertexId().getReadID() == 2 && getVertexId().getPosInRead() == 2)
+ getVertexValue().getRRList().append(3, (byte)1);
+ }
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(BridgeAddVertex.class.getSimpleName());
+ job.setVertexClass(BridgeAddVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 8716d8d..bc97dbd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -55,8 +55,10 @@
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
- private ArrayList<MessageWritable> receivedMsg = new ArrayList<MessageWritable>();
+ private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
+ private Iterator<PositionWritable> iterator;
+ private PositionWritable pos = new PositionWritable();
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
/**
@@ -68,7 +70,7 @@
if(length == -1)
length = getContext().getConfiguration().getInt(LENGTH, kmerSize + 5);
outgoingMsg.reset();
- receivedMsg.clear();
+ receivedMsgList.clear();
}
/**
@@ -110,6 +112,121 @@
sendMsg(destVertexId, outgoingMsg);
}
}
+
+ /**
+ * broadcast kill self to all neighbers
+ */
+ public void broadcaseKillself(){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ if(receivedMsgList.get(0).getMessage() == AdjMessage.FROMFF
+ && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRR){
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFF
+ && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFR
+ && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRR) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFR
+ && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } // RR
+ else if(receivedMsgList.get(1).getMessage() == AdjMessage.FROMFF
+ && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRR){
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFF
+ && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFR
+ && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRR) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFR
+ && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRF) {
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
+ deleteVertex(getVertexId());
+ }
+ }
+
+ /**
+ * do some remove operations on adjMap after receiving the info about dead Vertex
+ */
+ public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ //remove incomingMsg.getSourceId from RR positionList
+ iterator = getVertexValue().getRRList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ //remove incomingMsg.getSourceId from RF positionList
+ iterator = getVertexValue().getRFList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ //remove incomingMsg.getSourceId from FR positionList
+ iterator = getVertexValue().getFRList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ //remove incomingMsg.getSourceId from FF positionList
+ iterator = getVertexValue().getFFList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ }
+ }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
@@ -118,7 +235,7 @@
if(VertexUtil.isUpBridgeVertex(getVertexValue())){
sendMsgToAllNextNodes(getVertexValue());
}
- else if(VertexUtil.isUpBridgeVertex(getVertexValue())){
+ else if(VertexUtil.isDownBridgeVertex(getVertexValue())){
sendMsgToAllPreviousNodes(getVertexValue());
}
}
@@ -127,57 +244,17 @@
while (msgIterator.hasNext()) {
if(i == 3)
break;
- receivedMsg.add(msgIterator.next());
+ receivedMsgList.add(msgIterator.next());
i++;
}
- if(receivedMsg.size() == 2){
- if(getVertexValue().getLengthOfMergeChain() > length){
- outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsg.get(0).getMessage() == AdjMessage.FROMFF
- && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFF
- && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR
- && receivedMsg.get(1).getMessage() == AdjMessage.FROMRR) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
- sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if (receivedMsg.get(0).getMessage() == AdjMessage.FROMFR
- && receivedMsg.get(1).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
- sendMsg(receivedMsg.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(receivedMsg.get(1).getSourceVertexId(), outgoingMsg);
- deleteVertex(getVertexId());
- }
+ if(receivedMsgList.size() == 2){
+ if(getVertexValue().getLengthOfMergeChain() <= length){
+ broadcaseKillself();
}
}
}
else if(getSuperstep() == 3){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
- //remove incomingMsg.getSourceId from RR positionList
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
- //remove incomingMsg.getSourceId from RF positionList
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
- //remove incomingMsg.getSourceId from FR positionList
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
- //remove incomingMsg.getSourceId from FF positionList
- }
- }
+ responseToDeadVertex(msgIterator);
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
new file mode 100644
index 0000000..b02ea06
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -0,0 +1,118 @@
+package edu.uci.ics.genomix.pregelix.operator.bubblemerge;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Remove tip or single node when l > constant
+ */
+public class BubbleAddVertex extends
+ Vertex<PositionWritable, ValueStateWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BubbleAddVertex.kmerSize";
+ public static int kmerSize = -1;
+
+ /**
+ * initiate kmerSize, length
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if(getSuperstep() == 1){
+ if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 2){
+ getVertexValue().getFFList().append(2, (byte)1);
+
+ //add tip vertex
+ @SuppressWarnings("rawtypes")
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ PositionWritable vertexId = new PositionWritable();
+ ValueStateWritable vertexValue = new ValueStateWritable();
+ /**
+ * set the src vertex id
+ */
+ vertexId.set(2, (byte)1);
+ vertex.setVertexId(vertexId);
+ /**
+ * set the vertex value
+ */
+ byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
+ KmerBytesWritable kmer = new KmerBytesWritable(array.length);
+ kmer.setByRead(array, 0);
+ vertexValue.setMergeChain(kmer);
+ PositionListWritable plist = new PositionListWritable();
+ plist.append(new PositionWritable(1, (byte)2));
+ vertexValue.setRRList(plist);
+ PositionListWritable plist2 = new PositionListWritable();
+ plist2.append(new PositionWritable(1, (byte)4));
+ vertexValue.setFFList(plist2);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+ if(getVertexId().getReadID() == 1 && getVertexId().getPosInRead() == 4)
+ getVertexValue().getRRList().append(2, (byte)1);
+ }
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(BubbleAddVertex.class.getSimpleName());
+ job.setVertexClass(BubbleAddVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index e16cd20..f64c36d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.NullWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -58,11 +59,14 @@
private MergeBubbleMessageWritable incomingMsg = new MergeBubbleMessageWritable();
private MergeBubbleMessageWritable outgoingMsg = new MergeBubbleMessageWritable();
-
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
+
+ private Iterator<PositionWritable> iterator;
+ private PositionWritable pos = new PositionWritable();
private PositionWritable destVertexId = new PositionWritable();
private Iterator<PositionWritable> posIterator;
- private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsg = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
- private ArrayList<MergeBubbleMessageWritable> tmpMsg = new ArrayList<MergeBubbleMessageWritable>();
+ private Map<PositionWritable, ArrayList<MergeBubbleMessageWritable>> receivedMsgMap = new HashMap<PositionWritable, ArrayList<MergeBubbleMessageWritable>>();
+ private ArrayList<MergeBubbleMessageWritable> receivedMsgList = new ArrayList<MergeBubbleMessageWritable>();
/**
* initiate kmerSize, maxIteration
@@ -101,17 +105,93 @@
}
}
+ /**
+ * broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
+ */
+ public void broadcaseKillself(){
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ if(getVertexValue().getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ outgoingMsg.setMessage(AdjMessage.FROMFF);
+ else // #FRList() > 0
+ outgoingMsg.setMessage(AdjMessage.FROMFR);
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+
+ if(getVertexValue().getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ outgoingMsg.setMessage(AdjMessage.FROMRF);
+ else // #RRList() > 0
+ outgoingMsg.setMessage(AdjMessage.FROMRR);
+ sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+
+ deleteVertex(getVertexId());
+ }
+
+ /**
+ * do some remove operations on adjMap after receiving the info about dead Vertex
+ */
+ public void responseToDeadVertex(Iterator<MergeBubbleMessageWritable> msgIterator){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ //remove incomingMsg.getSourceId from RR positionList
+ iterator = getVertexValue().getRRList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ //remove incomingMsg.getSourceId from RF positionList
+ iterator = getVertexValue().getRFList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ //remove incomingMsg.getSourceId from FR positionList
+ iterator = getVertexValue().getFRList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ //remove incomingMsg.getSourceId from FF positionList
+ iterator = getVertexValue().getFFList().iterator();
+ while(iterator.hasNext()){
+ pos = iterator.next();
+ if(pos.equals(incomingMsg.getSourceVertexId())){
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
@Override
public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
- if(VertexUtil.isHeadVertex(getVertexValue())){
+ if(VertexUtil.isHeadVertex(getVertexValue())
+ || VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsgToAllNextNodes(getVertexValue());
}
} else if (getSuperstep() == 2){
while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
if(VertexUtil.isPathVertex(getVertexValue())){
+ outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
@@ -122,57 +202,71 @@
} else if (getSuperstep() == 3){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(!receivedMsg.containsKey(incomingMsg.getStartVertexId())){
- tmpMsg.clear();
- tmpMsg.add(incomingMsg);
- receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+ if(!receivedMsgMap.containsKey(incomingMsg.getStartVertexId())){
+ receivedMsgList.clear();
+ receivedMsgList.add(incomingMsg);
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
}
else{
- tmpMsg.clear();
- tmpMsg.addAll(receivedMsg.get(incomingMsg.getStartVertexId()));
- tmpMsg.add(incomingMsg);
- receivedMsg.put(incomingMsg.getStartVertexId(), tmpMsg);
+ receivedMsgList.clear();
+ receivedMsgList.addAll(receivedMsgMap.get(incomingMsg.getStartVertexId()));
+ receivedMsgList.add(incomingMsg);
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MergeBubbleMessageWritable>)receivedMsgList.clone());
}
}
- for(PositionWritable prevId : receivedMsg.keySet()){
- tmpMsg = receivedMsg.get(prevId);
- if(tmpMsg.size() > 1){
+ for(PositionWritable prevId : receivedMsgMap.keySet()){
+ receivedMsgList = receivedMsgMap.get(prevId);
+ if(receivedMsgList.size() > 1){
//find the node with largest length of mergeChain
boolean flag = true; //the same length
- int maxLength = tmpMsg.get(0).getLengthOfChain();
- PositionWritable max = tmpMsg.get(0).getSourceVertexId();
- for(int i = 1; i < tmpMsg.size(); i++){
- if(tmpMsg.get(i).getLengthOfChain() != maxLength)
+ int maxLength = receivedMsgList.get(0).getLengthOfChain();
+ PositionWritable max = receivedMsgList.get(0).getSourceVertexId();
+ PositionWritable secondMax = receivedMsgList.get(0).getSourceVertexId();
+ for(int i = 1; i < receivedMsgList.size(); i++){
+ if(receivedMsgList.get(i).getLengthOfChain() != maxLength)
flag = false;
- if(tmpMsg.get(i).getLengthOfChain() > maxLength){
- maxLength = tmpMsg.get(i).getLengthOfChain();
- max = tmpMsg.get(i).getSourceVertexId();
+ if(receivedMsgList.get(i).getLengthOfChain() >= maxLength){
+ maxLength = receivedMsgList.get(i).getLengthOfChain();
+ secondMax.set(max);
+ max = receivedMsgList.get(i).getSourceVertexId();
}
}
- //send merge or unchange Message to node with largest length
+ //send unchange or merge Message to node with largest length
if(flag == true){
- //send unchange Message to node with largest length
- //we can send no message to complete this step
- //send delete Message to node which doesn't have largest length
- for(int i = 0; i < tmpMsg.size(); i++){
- if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+ //1. send unchange Message to node with largest length
+ // we can send no message to complete this step
+ //2. send delete Message to node which doesn't have largest length
+ for(int i = 0; i < receivedMsgList.size(); i++){
+ //if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) != 0)
+ if(receivedMsgList.get(i).getSourceVertexId().compareTo(secondMax) == 0){
outgoingMsg.setMessage(AdjMessage.KILL);
- sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
- } else {
+ outgoingMsg.setStartVertexId(prevId);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(secondMax, outgoingMsg);
+ } else if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) == 0){
outgoingMsg.setMessage(AdjMessage.UNCHANGE);
- sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ sendMsg(max, outgoingMsg);
}
}
} else{
//send merge Message to node with largest length
- for(int i = 0; i < tmpMsg.size(); i++){
- if(tmpMsg.get(i).getSourceVertexId().compareTo(max) != 0){
+ for(int i = 0; i < receivedMsgList.size(); i++){
+ //if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) != 0)
+ if(receivedMsgList.get(i).getSourceVertexId().compareTo(secondMax) == 0){
outgoingMsg.setMessage(AdjMessage.KILL);
- sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
- } else {
+ outgoingMsg.setStartVertexId(prevId);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(receivedMsgList.get(i).getSourceVertexId(), outgoingMsg);
+ } else if(receivedMsgList.get(i).getSourceVertexId().compareTo(max) == 0){
outgoingMsg.setMessage(AdjMessage.MERGE);
/* add other node in message */
- sendMsg(tmpMsg.get(i).getSourceVertexId(), outgoingMsg);
+ for(int j = 0; j < receivedMsgList.size(); i++){
+ if(receivedMsgList.get(j).getSourceVertexId().compareTo(secondMax) == 0){
+ outgoingMsg.setChainVertexId(receivedMsgList.get(j).getChainVertexId());
+ break;
+ }
+ }
+ sendMsg(receivedMsgList.get(i).getSourceVertexId(), outgoingMsg);
}
}
}
@@ -182,11 +276,15 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(incomingMsg.getMessage() == AdjMessage.KILL){
- deleteVertex(getVertexId());
+ broadcaseKillself();
} else if (incomingMsg.getMessage() == AdjMessage.MERGE){
//merge with small node
+ getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ incomingMsg.getChainVertexId()));
}
}
+ } else if(getSuperstep() == 5){
+ responseToDeadVertex(msgIterator);
}
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 84c7f52..a4730c0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -136,6 +136,14 @@
sendMsgToAllPreviousNodes(getVertexValue());
voteToHalt();
}
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setMessage(Message.START);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setMessage(Message.END);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ }
}
/**
@@ -143,7 +151,9 @@
*/
public void initState(Iterator<MessageWritable> msgIterator) {
while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index 3d0d395..f27c140 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -155,8 +155,8 @@
public void initState(Iterator<MessageWritable> msgIterator) {
while (msgIterator.hasNext()) {
if (!VertexUtil.isPathVertex(getVertexValue())
- || !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- || !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
msgIterator.next();
voteToHalt();
} else {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 2966bb1..8c9fdc5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -3,6 +3,7 @@
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -81,6 +82,10 @@
/**
* set the vertex value
*/
+ byte[] array = { 'G', 'A', 'A'};
+ KmerBytesWritable kmer = new KmerBytesWritable(array.length);
+ kmer.setByRead(array, 0);
+ vertexValue.setMergeChain(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)4));
vertexValue.setRRList(plist);
@@ -93,8 +98,8 @@
}
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(TipRemoveVertex.class.getSimpleName());
- job.setVertexClass(TipRemoveVertex.class);
+ PregelixJob job = new PregelixJob(TipAddVertex.class.getSimpleName());
+ job.setVertexClass(TipAddVertex.class);
/**
* BinaryInput and BinaryOutput
*/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 0d5e518..4901269 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -55,8 +55,11 @@
private MessageWritable incomingMsg = new MessageWritable();
private MessageWritable outgoingMsg = new MessageWritable();
- Iterator<PositionWritable> iterator;
- PositionWritable pos = new PositionWritable();
+ private Iterator<PositionWritable> iterator;
+ private PositionWritable pos = new PositionWritable();
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+
/**
* initiate kmerSize, length
*/
@@ -67,28 +70,51 @@
length = getContext().getConfiguration().getInt(LENGTH, kmerSize); //kmerSize + 5
outgoingMsg.reset();
}
+
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(ValueStateWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ return posIterator.next();
+ }
+
+ public PositionWritable getPreDestVertexId(ValueStateWritable value) {
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
+ return posIterator.next();
+ }
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() >= length){
+ if(getVertexValue().getLengthOfMergeChain() <= length){
if(getVertexValue().getFFList().getCountOfPosition() > 0)
outgoingMsg.setMessage(AdjMessage.FROMFF);
else if(getVertexValue().getFRList().getCountOfPosition() > 0)
outgoingMsg.setMessage(AdjMessage.FROMFR);
outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
deleteVertex(getVertexId());
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() > length){
+ if(getVertexValue().getLengthOfMergeChain() <= length){
if(getVertexValue().getRFList().getCountOfPosition() > 0)
outgoingMsg.setMessage(AdjMessage.FROMRF);
else if(getVertexValue().getRRList().getCountOfPosition() > 0)
outgoingMsg.setMessage(AdjMessage.FROMRR);
outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getPreDestVertexId(getVertexValue()));
+ sendMsg(destVertexId, outgoingMsg);
deleteVertex(getVertexId());
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 9f59282..785c7f5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -11,6 +11,10 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
+import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
+import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
+import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
@@ -79,7 +83,7 @@
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(TipAddVertex.class);
job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(PositionWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
@@ -108,13 +112,85 @@
generateTipRemoveGraphJob("TipRemoveGraph", outputBase
+ "TipRemoveGraph.xml");
}
+
+ private static void generateBridgeAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeAddVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(BridgeAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+ private static void genBridgeAddGraph() throws IOException {
+ generateBridgeAddGraphJob("BridgeAddGraph", outputBase
+ + "BridgeAddGraph.xml");
+ }
+
+ private static void generateBridgeRemoveGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BridgeRemoveVertex.class);
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(TipRemoveVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBridgeRemoveGraph() throws IOException {
+ generateBridgeRemoveGraphJob("BridgeRemoveGraph", outputBase
+ + "BridgeRemoveGraph.xml");
+ }
+
+ private static void generateBubbleAddGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleAddVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(BubbleAddVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleAddGraph() throws IOException {
+ generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+ + "BubbleAddGraph.xml");
+ }
+
+ private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleMergeVertex.class);
+ job.setVertexInputFormatClass(DataCleanInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(BubbleMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleMergeGraph() throws IOException {
+ generateBubbleMergeGraphJob("BubbleMergeGraph", outputBase
+ + "BubbleMergeGraph.xml");
+ }
+
public static void main(String[] args) throws IOException {
- genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ //genNaiveAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
//genTipRemoveGraph();
+ //genBridgeAddGraph();
+ //genBridgeRemoveGraph();
+ //genBubbleAddGraph();
+ //genBubbleMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 49a5ed0..f86ef23 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,7 +45,7 @@
public static final String PreFix = "data/input"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "tipremove"};/*, PreFix + File.separator
+ + "read"};/*, PreFix + File.separator
/*+ "CyclePath"};, PreFix + File.separator
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index c5aa0eb..1984a2e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -50,7 +50,7 @@
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/tworeads.txt";
+ private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -71,8 +71,8 @@
@Test
public void TestAll() throws Exception {
- TestEndToEnd();
- //TestUnMergedNode();
+ //TestEndToEnd();
+ TestUnMergedNode();
}
public void TestEndToEnd() throws Exception {
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/NaiveAlgorithmForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/LogAlgorithmForMergeGraph.xml
similarity index 96%
rename from genomix/genomix-pregelix/src/test/resources/jobs/NaiveAlgorithmForMergeGraph.xml
rename to genomix/genomix-pregelix/src/test/resources/jobs/LogAlgorithmForMergeGraph.xml
index 39ed64e..2d6b9c4 100644
--- a/genomix/genomix-pregelix/src/test/resources/jobs/NaiveAlgorithmForMergeGraph.xml
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/LogAlgorithmForMergeGraph.xml
@@ -3,7 +3,6 @@
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>NaiveAlgorithmForPathMergeVertex.kmerSize</name><value>3</value></property>
<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
<property><name>mapred.submit.replication</name><value>10</value></property>
@@ -36,7 +35,7 @@
<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
<property><name>fs.checkpoint.size</name><value>67108864</value></property>
<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>NaiveAlgorithmForMergeGraph</value></property>
+<property><name>mapred.job.name</name><value>LogAlgorithmForMergeGraph</value></property>
<property><name>local.cache.size</name><value>10737418240</value></property>
<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
@@ -65,7 +64,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -104,6 +103,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>LogAlgorithmForPathMergeVertex.kmerSize</name><value>3</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -117,12 +117,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>