Merge branch 'anbangx/fullstack_genomix' into genomix/fullstack_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 2c64139..2c4ac0d 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -37,6 +37,12 @@
public static final byte DIR_RR = 0b11 << 0;
public static final byte DIR_MASK = 0b11 << 0;
}
+
+ public static class MergeDirFlag extends DirectionFlag{
+ public static final byte SHOULD_MERGEWITHNEXT = 0b0 << 2;
+ public static final byte SHOULD_MERGEWITHPREV = 0b1 << 2;
+ public static final byte SHOULD_MERGE_MASK = 0b1 << 2;
+ }
private PositionWritable nodeID;
private PositionListWritable forwardForwardList;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index c793609..8620445 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -163,10 +163,12 @@
}
public void append(PositionWritable pos) {
- setSize((1 + valueCount) * PositionWritable.LENGTH);
- System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
- * PositionWritable.LENGTH, pos.getLength());
- valueCount += 1;
+ if(pos != null){
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+ * PositionWritable.LENGTH, pos.getLength());
+ valueCount += 1;
+ }
}
public void append(int readID, byte posInRead) {
diff --git a/genomix/genomix-pregelix/data/graphbuild.test/read.txt b/genomix/genomix-pregelix/data/graphbuild.test/read.txt
index bd5f3c4..37f02de 100755
--- a/genomix/genomix-pregelix/data/graphbuild.test/read.txt
+++ b/genomix/genomix-pregelix/data/graphbuild.test/read.txt
@@ -1 +1 @@
-1 AATAGAAC
+1 AATAGAA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index 4c38255..c4365ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,7 +8,6 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -36,8 +35,7 @@
@Override
public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- if(vertex.getVertexValue().getState() != State.END_VERTEX)
- getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index cf254d1..f9a3444 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -70,7 +70,7 @@
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
- vertexValue.setMergeChain(node.getKmer());
+ vertexValue.setKmer(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 95e24be..32baccd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -41,7 +41,7 @@
throws IOException, InterruptedException {
node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
- vertex.getVertexValue().getRRList(), vertex.getVertexValue().getMergeChain());
+ vertex.getVertexValue().getRRList(), vertex.getVertexValue().getKmer());
getRecordWriter().write(node, nul);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
index d10e625..b9af850 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeInputFormat.java
@@ -73,7 +73,7 @@
vertexValue.setFRList(node.getFRList());
vertexValue.setRFList(node.getRFList());
vertexValue.setRRList(node.getRRList());
- vertexValue.setMergeChain(node.getKmer());
+ vertexValue.setKmer(node.getKmer());
vertexValue.setState(State.NON_VERTEX);
vertex.setVertexValue(vertexValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index f9f3749..7a1bdfa 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -41,7 +41,7 @@
throws IOException, InterruptedException {
node.set(vertex.getVertexId(), vertex.getVertexValue().getFFList(),
vertex.getVertexValue().getFRList(), vertex.getVertexValue().getRFList(),
- vertex.getVertexValue().getRRList(), vertex.getVertexValue().getMergeChain());
+ vertex.getVertexValue().getRRList(), vertex.getVertexValue().getKmer());
getRecordWriter().write(node, nullWritable);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 935da50..314dff6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -19,19 +19,17 @@
* file stores the point to the file that stores the chains of connected DNA
*/
private PositionWritable sourceVertexId;
- private KmerBytesWritable chainVertexId;
+ private KmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
- private byte message;
- private byte adjMessage;
+ private byte flag;
private byte checkMessage;
public MessageWritable() {
sourceVertexId = new PositionWritable();
- chainVertexId = new KmerBytesWritable(0);
+ kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
- message = Message.NON;
- adjMessage = AdjMessage.NON;
+ flag = Message.NON;
checkMessage = (byte) 0;
}
@@ -41,17 +39,16 @@
checkMessage |= CheckMessage.SOURCE;
this.sourceVertexId.set(msg.getSourceVertexId());
}
- if (chainVertexId != null) {
+ if (kmer != null) {
checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(msg.getChainVertexId());
+ this.kmer.set(msg.getKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(msg.getNeighberNode());
}
checkMessage |= CheckMessage.ADJMSG;
- this.adjMessage = msg.getAdjMessage();
- this.message = msg.getMessage();
+ this.flag = msg.getFlag();
}
public void set(PositionWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
@@ -62,20 +59,20 @@
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
+ this.kmer.set(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
this.neighberNode.set(neighberNode);
}
- this.message = message;
+ this.flag = message;
}
public void reset() {
checkMessage = 0;
- chainVertexId.reset(1);
+ kmer.reset(1);
neighberNode.reset();
- message = Message.NON;
+ flag = Message.NON;
}
public PositionWritable getSourceVertexId() {
@@ -89,14 +86,14 @@
}
}
- public KmerBytesWritable getChainVertexId() {
- return chainVertexId;
+ public KmerBytesWritable getKmer() {
+ return kmer;
}
public void setChainVertexId(KmerBytesWritable chainVertexId) {
if (chainVertexId != null) {
checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
+ this.kmer.set(chainVertexId);
}
}
@@ -112,24 +109,15 @@
}
public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
- }
-
- public byte getAdjMessage() {
- return adjMessage;
+ return kmer.getKmerLength();
}
- public void setAdjMessage(byte adjMessage) {
- checkMessage |= CheckMessage.ADJMSG;
- this.adjMessage = adjMessage;
+ public byte getFlag() {
+ return flag;
}
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
+ public void setFlag(byte message) {
+ this.flag = message;
}
@Override
@@ -138,12 +126,10 @@
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.write(out);
+ kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
- if ((checkMessage & CheckMessage.ADJMSG) != 0)
- out.write(adjMessage);
- out.write(message);
+ out.write(flag);
}
@Override
@@ -153,12 +139,10 @@
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.readFields(in);
+ kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
- if ((checkMessage & CheckMessage.ADJMSG) != 0)
- adjMessage = in.readByte();
- message = in.readByte();
+ flag = in.readByte();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index de483bc..2defa0c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -4,6 +4,7 @@
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
@@ -14,39 +15,39 @@
private AdjacencyListWritable incomingList;
private AdjacencyListWritable outgoingList;
private byte state;
- private KmerBytesWritable mergeChain;
+ private KmerBytesWritable kmer;
private PositionWritable mergeDest;
public VertexValueWritable() {
incomingList = new AdjacencyListWritable();
outgoingList = new AdjacencyListWritable();
state = State.NON_VERTEX;
- mergeChain = new KmerBytesWritable(0);
+ kmer = new KmerBytesWritable(0);
mergeDest = new PositionWritable();
}
public VertexValueWritable(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
- byte state, KmerBytesWritable mergeChain) {
+ byte state, KmerBytesWritable kmer) {
set(forwardForwardList, forwardReverseList,
reverseForwardList, reverseReverseList,
- state, mergeChain);
+ state, kmer);
}
public void set(PositionListWritable forwardForwardList, PositionListWritable forwardReverseList,
PositionListWritable reverseForwardList, PositionListWritable reverseReverseList,
- byte state, KmerBytesWritable mergeChain) {
+ byte state, KmerBytesWritable kmer) {
this.incomingList.setForwardList(reverseForwardList);
this.incomingList.setReverseList(reverseReverseList);
this.outgoingList.setForwardList(forwardForwardList);
this.outgoingList.setReverseList(forwardReverseList);
this.state = state;
- this.mergeChain.set(mergeChain);
+ this.kmer.set(kmer);
}
public void set(VertexValueWritable value) {
set(value.getFFList(),value.getFRList(),value.getRFList(),value.getRRList(),value.getState(),
- value.getMergeChain());
+ value.getKmer());
}
public PositionListWritable getFFList() {
@@ -105,16 +106,16 @@
this.state = state;
}
- public int getLengthOfMergeChain() {
- return mergeChain.getKmerLength();
+ public int getLengthOfKmer() {
+ return kmer.getKmerLength();
}
- public KmerBytesWritable getMergeChain() {
- return mergeChain;
+ public KmerBytesWritable getKmer() {
+ return kmer;
}
- public void setMergeChain(KmerBytesWritable mergeChain) {
- this.mergeChain.set(mergeChain);
+ public void setKmer(KmerBytesWritable kmer) {
+ this.kmer.set(kmer);
}
public PositionWritable getMergeDest() {
@@ -130,7 +131,7 @@
incomingList.readFields(in);
outgoingList.readFields(in);
state = in.readByte();
- mergeChain.readFields(in);
+ kmer.readFields(in);
mergeDest.readFields(in);
}
@@ -139,7 +140,7 @@
incomingList.write(out);
outgoingList.write(out);
out.writeByte(state);
- mergeChain.write(out);
+ kmer.write(out);
mergeDest.write(out);
}
@@ -156,7 +157,7 @@
sbuilder.append(outgoingList.getReverseList().toString()).append('\t');
sbuilder.append(incomingList.getForwardList().toString()).append('\t');
sbuilder.append(incomingList.getReverseList().toString()).append('\t');
- sbuilder.append(mergeChain.toString()).append(')');
+ sbuilder.append(kmer.toString()).append(')');
return sbuilder.toString();
}
@@ -167,4 +168,88 @@
public int outDegree(){
return outgoingList.getForwardList().getCountOfPosition() + outgoingList.getReverseList().getCountOfPosition();
}
+
+ /*
+ * Process any changes to value. This is for edge updates
+ */
+ public void processUpdates(byte neighborToDeleteDir, PositionWritable nodeToDelete,
+ byte neighborToMergeDir, PositionWritable nodeToAdd){
+// TODO
+// this.getListFromDir(neighborToDeleteDir).remove(nodeToDelete);
+// this.getListFromDir(neighborToMergeDir).append(nodeToDelete);
+
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ switch (neighborToMergeDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().append(nodeToAdd);
+ break;
+ }
+ }
+
+ /*
+ * Process any changes to value. This is for merging
+ */
+ public void processMerges(byte neighborToDeleteDir, PositionWritable nodeToDelete,
+ byte neighborToMergeDir, PositionWritable nodeToAdd,
+ int kmerSize, KmerBytesWritable kmer){
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete); //set(null);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ // TODO: remove switch below and replace with general direction merge
+// this.getKmer().mergeWithDirKmer(neighborToMergeDir);
+
+ switch (neighborToMergeDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getKmer().mergeWithFFKmer(kmerSize, kmer);
+ this.getFFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getKmer().mergeWithFRKmer(kmerSize, kmer);
+ this.getFRList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getKmer().mergeWithRFKmer(kmerSize, kmer);
+ this.getRFList().append(nodeToAdd);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getKmer().mergeWithRRKmer(kmerSize, kmer);
+ this.getRRList().append(nodeToAdd);
+ break;
+ }
+ }
+
+
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index dca2cb8..c29e1b3 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -75,10 +75,10 @@
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
}
- builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getMessage()) + "\r\n");
+ builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getFlag()) + "\r\n");
if (msg.getLengthOfChain() != -1) {
- chain = msg.getChainVertexId().toString();
+ chain = msg.getKmer().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
index 8525a0a..ca42f4e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeAddVertex.java
@@ -89,7 +89,7 @@
byte[] array = { 'T', 'A', 'G', 'C', 'C', 'T'};
KmerBytesWritable kmer = new KmerBytesWritable(array.length);
kmer.setByRead(array, 0);
- vertexValue.setMergeChain(kmer);
+ vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)2));
vertexValue.setRRList(plist);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
index 2770ab8..cf312db 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bridgeremove/BridgeRemoveVertex.java
@@ -79,14 +79,14 @@
public void sendMsgToAllNextNodes(VertexValueWritable value) {
posIterator = value.getFFList().iterator(); // FFList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getFRList().iterator(); // FRList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -99,14 +99,14 @@
public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
posIterator = value.getRFList().iterator(); // RFList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
}
posIterator = value.getRRList().iterator(); // RRList
while(posIterator.hasNext()){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(posIterator.next());
sendMsg(destVertexId, outgoingMsg);
@@ -118,61 +118,61 @@
*/
public void broadcaseKillself(){
outgoingMsg.setSourceVertexId(getVertexId());
- if(receivedMsgList.get(0).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRR){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ if(receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR){
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRR) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRR) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(0).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(1).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(0).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(1).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
} // RR
- else if(receivedMsgList.get(1).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRR){
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ else if(receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR){
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFF
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFF
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRR) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRR) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
- } else if (receivedMsgList.get(1).getMessage() == AdjMessage.FROMFR
- && receivedMsgList.get(0).getMessage() == AdjMessage.FROMRF) {
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ } else if (receivedMsgList.get(1).getFlag() == AdjMessage.FROMFR
+ && receivedMsgList.get(0).getFlag() == AdjMessage.FROMRF) {
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
sendMsg(receivedMsgList.get(1).getSourceVertexId(), outgoingMsg);
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
sendMsg(receivedMsgList.get(0).getSourceVertexId(), outgoingMsg);
deleteVertex(getVertexId());
}
@@ -184,7 +184,7 @@
public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ if(incomingMsg.getFlag() == AdjMessage.FROMFF){
//remove incomingMsg.getSourceId from RR positionList
iterator = getVertexValue().getRRList().iterator();
while(iterator.hasNext()){
@@ -194,7 +194,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMFR){
//remove incomingMsg.getSourceId from RF positionList
iterator = getVertexValue().getRFList().iterator();
while(iterator.hasNext()){
@@ -204,7 +204,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMRF){
//remove incomingMsg.getSourceId from FR positionList
iterator = getVertexValue().getFRList().iterator();
while(iterator.hasNext()){
@@ -214,7 +214,7 @@
break;
}
}
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ } else{ //incomingMsg.getFlag() == AdjMessage.FROMRR
//remove incomingMsg.getSourceId from FF positionList
iterator = getVertexValue().getFFList().iterator();
while(iterator.hasNext()){
@@ -248,7 +248,7 @@
i++;
}
if(receivedMsgList.size() == 2){
- if(getVertexValue().getLengthOfMergeChain() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length){
broadcaseKillself();
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
index 63ac5b5..940d149 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleAddVertex.java
@@ -85,7 +85,7 @@
byte[] array = { 'T', 'A', 'G', 'C', 'C', 'A', 'G'}; //TAGCCAG
KmerBytesWritable kmer = new KmerBytesWritable(array.length);
kmer.setByRead(array, 0);
- vertexValue.setMergeChain(kmer);
+ vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)2));
vertexValue.setRRList(plist);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 396d43d..cd449b5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -181,7 +181,7 @@
public void compute(Iterator<MergeBubbleMessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
- if(VertexUtil.isHeadVertex(getVertexValue())
+ if(VertexUtil.isHeadVertexWithIndegree(getVertexValue())
|| VertexUtil.isHeadWithoutIndegree(getVertexValue())){
outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setSourceVertexId(getVertexId());
@@ -194,7 +194,7 @@
outgoingMsg.setMessage(AdjMessage.NON);
outgoingMsg.setStartVertexId(incomingMsg.getSourceVertexId());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
}
@@ -217,7 +217,7 @@
for(PositionWritable prevId : receivedMsgMap.keySet()){
receivedMsgList = receivedMsgMap.get(prevId);
if(receivedMsgList.size() > 1){
- //find the node with largest length of mergeChain
+ //find the node with largest length of Kmer
boolean flag = true; //the same length
int maxLength = receivedMsgList.get(0).getLengthOfChain();
PositionWritable max = receivedMsgList.get(0).getSourceVertexId();
@@ -279,7 +279,7 @@
broadcaseKillself();
} else if (incomingMsg.getMessage() == AdjMessage.MERGE){
//merge with small node
- getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
incomingMsg.getChainVertexId()));
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
new file mode 100644
index 0000000..9238f9d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -0,0 +1,451 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class BasicPathMergeVertex extends
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "BasicPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "BasicPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ protected int maxIteration = -1;
+
+ protected MessageWritable incomingMsg = new MessageWritable();
+ protected MessageWritable outgoingMsg = new MessageWritable();
+ protected PositionWritable destVertexId = new PositionWritable();
+ protected Iterator<PositionWritable> posIterator;
+ byte headFlag;
+ protected byte outFlag;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ outFlag = (byte)0;
+ outgoingMsg.reset();
+ }
+
+ /**
+ * reset headFlag
+ */
+ public void resetHeadFlag(){
+ headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+ }
+
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ outFlag |= MessageFlag.DIR_FF;
+ }
+ else{ // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ outFlag |= MessageFlag.DIR_FR;
+ }
+ return posIterator.next();
+ }
+
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ if(value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ outFlag |= MessageFlag.DIR_RF;
+ }
+ else{ // #RRList() > 0
+ posIterator = value.getRRList().iterator();
+ outFlag |= MessageFlag.DIR_RR;
+ }
+ return posIterator.next();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * one vertex send message to previous and next vertices (neighbor)
+ */
+ public void sendMsgToAllNeighborNodes(VertexValueWritable value){
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg() {
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsgToAllNextNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsgToAllPreviousNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setFlag(MessageFlag.IS_HEAD);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ }
+
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<MessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+ msgIterator.next();
+ voteToHalt();
+ } else {
+ incomingMsg = msgIterator.next();
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ }
+ }
+ }
+
+ /**
+ * check if A need to be flipped with successor
+ */
+ public boolean ifFilpWithSuccessor(){
+ if(getVertexValue().getFRList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * check if A need to be filpped with predecessor
+ */
+ public boolean ifFlipWithPredecessor(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * set adjMessage to successor(from predecessor)
+ */
+ public void setSuccessorAdjMsg(){
+ if(getVertexValue().getFFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_FF;
+ else
+ outFlag |= MessageFlag.DIR_FR;
+ }
+
+ /**
+ * set adjMessage to predecessor(from successor)
+ */
+ public void setPredecessorAdjMsg(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_RF;
+ else
+ outFlag |= MessageFlag.DIR_RR;
+ }
+
+ /**
+ * send update message to neighber
+ * @throws IOException
+ */
+ public void broadcastUpdateMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ outFlag |= MessageFlag.IS_HEAD;
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * send merge message to neighber for P2
+ * @throws IOException
+ */
+ public void sendMergeMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0){
+ byte newState = getVertexValue().getState();
+ newState &= ~MessageFlag.IS_HEAD;
+ newState |= MessageFlag.IS_OLDHEAD;
+ getVertexValue().setState(newState);
+ outFlag |= MessageFlag.IS_HEAD;
+ voteToHalt();
+ } else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
+ outFlag |= MessageFlag.IS_OLDHEAD;
+ voteToHalt();
+ }
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
+ break;
+ }
+ }
+
+ /**
+ * send merge message to neighber for P4
+ * @throws IOException
+ */
+ public void broadcastMergeMsg(){
+ if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
+ outFlag |= MessageFlag.IS_HEAD;
+ switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK) {
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgToPredecessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ if(getVertexValue().getFFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgToSuccessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
+ if(getVertexValue().getRFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /**
+ * check if need filp
+ */
+ public byte flipDirection(byte neighborDir, boolean flip){
+ if(flip){
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+ } else
+ return neighborDir;
+ }
+
+ /**
+ * updateAdjList
+ */
+ public void processUpdate(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
+ }
+
+ /**
+ * merge and updateAdjList merge with one neighbor
+ */
+ public void processMerge(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
+ kmerSize, incomingMsg.getKmer());
+ }
+
+ /**
+ * merge and updateAdjList having parameter
+ */
+ public void processMerge(MessageWritable msg){
+ byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
+ kmerSize, msg.getKmer());
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index cff6cd7..429282b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -1,21 +1,16 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+import java.util.ArrayList;
import java.util.Iterator;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
import edu.uci.ics.genomix.type.PositionWritable;
/*
* vertexId: BytesWritable
@@ -46,20 +41,13 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
+ BasicPathMergeVertex {
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
-
- private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
-
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
+ protected MessageWritable outgoingMsg2 = new MessageWritable();
+ protected PositionWritable destVertexId2 = new PositionWritable();
+
+ byte finalFlag;
/**
* initiate kmerSize, maxIteration
*/
@@ -68,195 +56,116 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
outgoingMsg.reset();
}
/**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
- getVertexValue().setState(State.START_VERTEX);
- //getVertexValue().setMergeChain(null);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(State.END_VERTEX);
- getVertexValue().setMergeChain(getVertexValue().getMergeChain());
- voteToHalt();
- } else
- voteToHalt();
- }
-
- /**
* head send message to path
*/
public void sendOutMsg() {
- if (getVertexValue().getState() == State.START_VERTEX) {
- outgoingMsg.setMessage(Message.START);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- } else if (getVertexValue().getState() != State.END_VERTEX) {
- outgoingMsg.setMessage(Message.NON);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ //send wantToMerge to next
+ destVertexId.set(getNextDestVertexId(getVertexValue()));
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, outgoingMsg);
+
+ ////send wantToMerge to prev
+ destVertexId2.set(getPreDestVertexId(getVertexValue()));
+ outgoingMsg2.setFlag(outFlag);
+ outgoingMsg2.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId2, outgoingMsg2);
+ }
+
+ /**
+ * check received message
+ */
+ public byte checkNumOfMsgsFromHead(){
+ int countHead = 0;
+ int countOldHead = 0;
+ for(int i = 0; i < receivedMsgList.size(); i++){
+ if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_HEAD) > 0)
+ countHead++;
+ if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_OLDHEAD) > 0)
+ countOldHead++;
}
+ if(countHead == 0 && countOldHead == 0)
+ return MessageFromHead.BothMsgsFromNonHead;
+ else if(countHead == 2)
+ return MessageFromHead.BothMsgsFromHead;
+ else if(countOldHead == 2)
+ return MessageFromHead.BothMsgsFromOldHead;
+ else if(countHead == 1 && headFlag == 0)
+ return MessageFromHead.OneMsgFromHeadToNonHead;
+ else if(countHead == 1 && headFlag > 0)
+ return MessageFromHead.OneMsgFromHeadToHead;
+ else if(countOldHead == 1)
+ return MessageFromHead.OneMsgFromNonHead;
+
+ return MessageFromHead.NO_INFO;
}
/**
* head send message to path
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
- if (getSuperstep() == 3) {
- sendOutMsg();
- } else {
- if (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if (mergeChainVertex()) {
- if (incomingMsg.getMessage() == Message.END) {
- if (getVertexValue().getState() == State.START_VERTEX) {
- getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
- //System.out.println();
- } else
- getVertexValue().setState(State.END_VERTEX);
- } else
- sendOutMsg();
- }
+ //process merge when receiving msg
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ receivedMsgList.add(incomingMsg);
+ }
+ if(receivedMsgList.size() != 0){
+ byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
+ switch(numOfMsgsFromHead){
+ case MessageFromHead.BothMsgsFromNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ break;
+ case MessageFromHead.BothMsgsFromHead:
+ case MessageFromHead.BothMsgsFromOldHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ break;
+ case MessageFromHead.OneMsgFromHeadToNonHead:
+ for(int i = 0; i < 2; i++)
+ processMerge(receivedMsgList.get(i));
+ getVertexValue().setState(MessageFlag.IS_HEAD);
+ break;
+ case MessageFromHead.OneMsgFromHeadToHead: //stop condition
+ for(int i = 0; i < 2; i++){
+ if(headFlag > 0){
+ processMerge(receivedMsgList.get(i));
+ break;
+ }
+ }
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ //voteToHalt();
+ break;
+ case MessageFromHead.OneMsgFromNonHead:
+ //halt
+ //voteToHalt();
+ break;
}
}
+ //send out wantToMerge msg
+ resetHeadFlag();
+ finalFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_FINAL);
+ outFlag = (byte)(headFlag | finalFlag);
+ if(outFlag == 0)
+ sendOutMsg();
}
/**
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
- if (msgIterator.hasNext()) {
+ while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.END);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
-
- if (incomingMsg.getMessage() == Message.START)
- deleteVertex(getVertexId());
- } else {
- if (getVertexValue().getState() != State.START_VERTEX && getVertexValue().getState() != State.END_VERTEX)
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ sendMergeMsg();
}
}
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public boolean mergeChainVertex() {
- //merge chain
- lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- KmerBytesWritable chainVertexId = kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer);
- getVertexValue().setMergeChain(chainVertexId);
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- if (VertexUtil.isCycle(kmerFactory.getFirstKmerFromChain(kmerSize, getVertexValue().getMergeChain()),
- chainVertexId, kmerSize)) {
- getVertexValue().setState(State.CYCLE);
- return false;
- }
- return true;
- }
-
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -266,10 +175,10 @@
initState(msgIterator);
else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- voteToHalt();
+ if(headFlag == 0)
+ voteToHalt();
} else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- voteToHalt();
} else
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index 47ca06e..0b636ee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -131,20 +131,20 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
sendMsgToAllPreviousNodes(getVertexValue());
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(Message.START);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(Message.END);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
}
@@ -170,9 +170,9 @@
* set vertex state
*/
public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
+ if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(State.START_VERTEX);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
voteToHalt();
} else
@@ -185,8 +185,8 @@
public void mergeChainVertex() {
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- getVertexValue().setMergeChain(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(), lastKmer));
+ incomingMsg.getKmer()));
+ getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -201,7 +201,7 @@
} else {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (incomingMsg.getMessage() != Message.STOP) {
+ if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getNextDestVertexId(getVertexValue()));
@@ -209,7 +209,7 @@
} else {
mergeChainVertex();
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
}
}
@@ -222,9 +222,9 @@
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
+ outgoingMsg.setFlag(Message.STOP);
destVertexId.set(incomingMsg.getSourceVertexId());
sendMsg(destVertexId, outgoingMsg);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 949e8bb..69e1fb6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -138,20 +138,20 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
sendMsgToAllNextNodes(getVertexValue());
}
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
sendMsgToAllPreviousNodes(getVertexValue());
}
if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setFlag(Message.START);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
+ outgoingMsg.setFlag(Message.END);
sendMsg(getVertexId(), outgoingMsg); //send to itself
}
}
@@ -191,9 +191,9 @@
* set vertex state
*/
public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
+ if (incomingMsg.getFlag() == Message.START) {
getVertexValue().setState(State.START_VERTEX);
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.END_VERTEX);
voteToHalt();
} else
@@ -205,7 +205,7 @@
*/
public void markPseudoHead() {
getVertexValue().setState(State.PSEUDOHEAD);
- outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
destVertexId
.set(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -215,12 +215,12 @@
* mark the pseudoRear
*/
public void markPseudoRear() {
- if (incomingMsg.getMessage() == Message.FROMPSEUDOHEAD
+ if (incomingMsg.getFlag() == Message.FROMPSEUDOHEAD
&& getVertexValue().getState() != State.START_VERTEX) {
getVertexValue().setState(State.PSEUDOREAR);
voteToHalt();
}
- else if(incomingMsg.getMessage() == Message.FROMPSEUDOHEAD
+ else if(incomingMsg.getFlag() == Message.FROMPSEUDOHEAD
&& getVertexValue().getState() == State.START_VERTEX){
getVertexValue().setState(State.START_HALT);
}
@@ -231,9 +231,9 @@
*/
public void mergeChainVertex(){
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- getVertexValue().setMergeChain(
- kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ incomingMsg.getKmer()));
+ getVertexValue().setKmer(
+ kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -249,7 +249,7 @@
} else {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if (incomingMsg.getMessage() != Message.STOP) {
+ if (incomingMsg.getFlag() != Message.STOP) {
mergeChainVertex();
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId
@@ -258,7 +258,7 @@
} else {
mergeChainVertex();
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
}
}
@@ -271,9 +271,9 @@
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
+ outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
}
@@ -292,10 +292,10 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
//if from pseudoHead, voteToHalt(), otherwise ...
- if (incomingMsg.getMessage() != Message.FROMPSEUDOHEAD){
+ if (incomingMsg.getFlag() != Message.FROMPSEUDOHEAD){
mergeChainVertex();
- if (incomingMsg.getMessage() != Message.STOP
- && incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
+ if (incomingMsg.getFlag() != Message.STOP
+ && incomingMsg.getFlag() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -303,12 +303,12 @@
} else {
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
- && incomingMsg.getMessage() == Message.STOP) {
+ && incomingMsg.getFlag() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
} else if(getVertexValue().getState() == State.PSEUDOHEAD
- && incomingMsg.getMessage() == Message.STOP)
+ && incomingMsg.getFlag() == Message.STOP)
getVertexValue().setState(State.END_VERTEX);
}
}
@@ -321,15 +321,15 @@
*/
public void responseMsgToHeadVertexPartitionPhase() {
if (getVertexValue().getState() == State.PSEUDOHEAD)
- outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ outgoingMsg.setFlag(Message.FROMPSEUDOHEAD);
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
if (getVertexValue().getState() == State.PSEUDOREAR)
- outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
+ outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
+ outgoingMsg.setFlag(Message.STOP);
}
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
voteToHalt();
@@ -345,12 +345,12 @@
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
- && incomingMsg.getMessage() == Message.STOP) {
+ && incomingMsg.getFlag() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
+ //String source = getVertexValue().getKmer().toString();
//System.out.println();
} else if(getVertexValue().getState() == State.PSEUDOHEAD
- && incomingMsg.getMessage() == Message.STOP)
+ && incomingMsg.getFlag() == Message.STOP)
getVertexValue().setState(State.END_VERTEX);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index b6c03fb..f250afc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
@@ -14,8 +15,6 @@
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.AdjMessage;
-import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
@@ -52,13 +51,9 @@
* Naive Algorithm for path merge graph
*/
public class P4ForPathMergeVertex extends
- Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
- public static final String KMER_SIZE = "P4ForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "P4ForPathMergeVertex.iteration";
+ BasicPathMergeVertex {
public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- public static int kmerSize = -1;
- private int maxIteration = -1;
private static long randSeed = -1;
private float probBeingRandomHead = -1;
@@ -74,14 +69,7 @@
private boolean prevHead;
private byte headFlag;
private byte tailFlag;
- private byte outFlag;
- private byte outUpFlag;
-
- private MessageWritable incomingMsg = new MessageWritable();
- private MessageWritable outgoingMsg = new MessageWritable();
- private MessageWritable outgoingUpMsg = new MessageWritable();
- private PositionWritable destVertexId = new PositionWritable();
- private Iterator<PositionWritable> posIterator;
+ private byte selfFlag;
/**
* initiate kmerSize, maxIteration
@@ -101,6 +89,7 @@
curHead = false;
nextHead = false;
prevHead = false;
+ outFlag = (byte)0;
outgoingMsg.reset();
}
@@ -144,256 +133,6 @@
return false;
}
- /**
- * get destination vertex
- */
- public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
- posIterator = value.getFFList().iterator();
- else // #FRList() > 0
- posIterator = value.getFRList().iterator();
- return posIterator.next();
- }
-
- public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
- posIterator = value.getRFList().iterator();
- else // #RRList() > 0
- posIterator = value.getRRList().iterator();
- return posIterator.next();
- }
-
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(VertexValueWritable value) {
- posIterator = value.getFFList().iterator(); // FFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getFRList().iterator(); // FRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
- posIterator = value.getRFList().iterator(); // RFList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- posIterator = value.getRRList().iterator(); // RRList
- while(posIterator.hasNext()){
- destVertexId.set(posIterator.next());
- sendMsg(destVertexId, outgoingMsg);
- }
- }
-
- /**
- * start sending message
- */
- public void startSendMsg() {
- if (VertexUtil.isHeadVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isRearVertex(getVertexValue())) {
- outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexValue());
- voteToHalt();
- }
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
- outgoingMsg.setMessage(Message.START);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setMessage(Message.END);
- sendMsg(getVertexId(), outgoingMsg); //send to itself
- voteToHalt();
- }
- }
-
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<MessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState() {
- if (incomingMsg.getMessage() == Message.START) {
- getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
- } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
- getVertexValue().setState(MessageFlag.IS_TAIL);
- getVertexValue().setMergeChain(getVertexValue().getMergeChain());
- //voteToHalt();
- } //else
- //voteToHalt();
- }
-
- /**
- * check if A need to be flipped with successor
- */
- public boolean ifFilpWithSuccessor(){
- if(getVertexValue().getFRList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * check if A need to be filpped with predecessor
- */
- public boolean ifFlipWithPredecessor(){
- if(getVertexValue().getRFList().getLength() > 0)
- return true;
- else
- return false;
- }
-
- /**
- * set adjMessage to successor(from predecessor)
- */
- public void setSuccessorAdjMsg(){
- if(getVertexValue().getFFList().getLength() > 0)
- outgoingMsg.setAdjMessage(AdjMessage.FROMFF);
- else
- outgoingMsg.setAdjMessage(AdjMessage.FROMFR);
- }
-
- /**
- * set adjMessage to predecessor(from successor)
- */
- public void setPredecessorAdjMsg(){
- if(getVertexValue().getRFList().getLength() > 0)
- outgoingMsg.setAdjMessage(AdjMessage.FROMRF);
- else
- outgoingMsg.setAdjMessage(AdjMessage.FROMRR);
- }
-
- /**
- * send update message to neighber
- */
- public void broadcastUpdateMsg(){
- outFlag |= MessageFlag.FROM_PREDECESSOR;
- outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
- if(ifFlipWithPredecessor())
- outFlag |= MessageFlag.FLIP;
- outgoingMsg.setMessage(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- setSuccessorAdjMsg();
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
- outUpFlag = (byte)(MessageFlag.FROM_SUCCESSOR);
- outgoingUpMsg.setNeighberNode(getVertexValue().getOutgoingList());
- if(ifFilpWithSuccessor())
- outFlag |= MessageFlag.FLIP;
- outgoingUpMsg.setMessage(outUpFlag);
- outgoingUpMsg.setSourceVertexId(getVertexId());
- setPredecessorAdjMsg();
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingUpMsg);
- //remove its own neighbers
- getVertexValue().setIncomingList(null);
- getVertexValue().setOutgoingList(null);
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- */
- public void sendUpMsgFromPredecessor(){
- getVertexValue().setState(MessageFlag.SHOULD_MERGEWITHNEXT);
- if(getVertexValue().getFFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * This vertex tries to merge with next vertex and send update msg to neighber
- */
- public void sendUpMsgFromSuccessor(){
- getVertexValue().setState(MessageFlag.SHOULD_MERGEWITHPREV);
- if(getVertexValue().getRFList().getLength() > 0)
- getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
- else
- getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
- broadcastUpdateMsg();
- }
-
- /**
- * updateAdjList
- */
- public void updateAdjList(){
- if(incomingMsg.getAdjMessage() == AdjMessage.FROMFF){
- getVertexValue().setRRList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setRFList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setRFList(incomingMsg.getNeighberNode().getReverseList());
- } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMFR){
- getVertexValue().setFRList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setFFList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setFFList(incomingMsg.getNeighberNode().getReverseList());
- } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMRF){
- getVertexValue().setRFList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setRRList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setRRList(incomingMsg.getNeighberNode().getReverseList());
- } else if(incomingMsg.getAdjMessage() == AdjMessage.FROMRR){
- getVertexValue().setFFList(null); //may replace setNull with remove
- if(incomingMsg.getNeighberNode().getForwardList().getLength() > 0)
- getVertexValue().setFRList(incomingMsg.getNeighberNode().getForwardList());
- else
- getVertexValue().setFRList(incomingMsg.getNeighberNode().getReverseList());
- }
- }
-
- /**
- * update AdjacencyList if message from predecessor
- */
- public void updateAdjList_MsgPredecessor(){
- if((outFlag & MessageFlag.FLIP) > 0){
- updateAdjList();
- } else {
- getVertexValue().setIncomingList(incomingMsg.getNeighberNode());
- }
- }
-
- /**
- * update AdjacencyList if message from successor
- */
- public void updateAdjList_MsgSuccessor(){
- if((outFlag & MessageFlag.FLIP) > 0){
- updateAdjList();
- } else {
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- }
- }
-
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -416,18 +155,14 @@
// We prevent merging towards non-path nodes
hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
- if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
- getVertexValue().setState(outFlag);
- voteToHalt();
- }
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
- sendUpMsgFromPredecessor();
+ sendUpMsgToPredecessor(); //TODO up -> update From -> to
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
- sendUpMsgFromSuccessor();
+ sendUpMsgToSuccessor();
}
} else {
// I'm a tail
@@ -435,19 +170,19 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- sendUpMsgFromPredecessor();
+ sendUpMsgToPredecessor();
}
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- sendUpMsgFromPredecessor();
+ sendUpMsgToPredecessor();
}
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- sendUpMsgFromSuccessor();
+ sendUpMsgToSuccessor();
}
}
}
@@ -457,44 +192,23 @@
//update neighber
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- outFlag = incomingMsg.getMessage();
- if((outFlag & MessageFlag.FROM_PREDECESSOR) > 0){
- updateAdjList_MsgPredecessor();
- }
- else {//Message from successor.
- updateAdjList_MsgSuccessor();
- }
+ processUpdate();
}
} else if (getSuperstep() % 4 == 1){
//send message to the merge object and kill self
- if((getVertexValue().getState() | MessageFlag.SHOULD_MERGEWITHNEXT) > 0){
- setSuccessorAdjMsg();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- sendMsg(getVertexValue().getMergeDest(), outgoingMsg);
- deleteVertex(getVertexId());
- } else if((getVertexValue().getState() | MessageFlag.SHOULD_MERGEWITHPREV) > 0){
- setPredecessorAdjMsg();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- sendMsg(getVertexValue().getMergeDest(), outgoingMsg);
- deleteVertex(getVertexId());
- }
+ broadcastMergeMsg();
+ deleteVertex(getVertexId());
} else if (getSuperstep() % 4 == 2){
//merge kmer
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- outFlag = incomingMsg.getMessage();
- if(outFlag == AdjMessage.FROMFF){
- //mergeWithRR(incomingMsg.getChain())
- }
- else if(outFlag == AdjMessage.FROMFR){
- //mergeWithRF(incomingMsg.getChain())
- }
- else if(outFlag == AdjMessage.FROMRF){
- //mergeWithFR(incomingMsg.getChain())
- }
- else if(outFlag == AdjMessage.FROMRR){
- //mergeWithFF(incomingMsg.getChain())
- }
+ processMerge();
+
+ //head meets head, stop
+ headFlag = (byte) (MessageFlag.IS_HEAD & incomingMsg.getFlag());
+ selfFlag = (byte) (MessageFlag.IS_HEAD & getVertexValue().getState());
+ if((headFlag & selfFlag) > 0)
+ voteToHalt();
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
new file mode 100644
index 0000000..27eb40a
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -0,0 +1,501 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.PositionWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P5ForPathMergeVertex extends
+ Vertex<PositionWritable, VertexValueWritable, NullWritable, MessageWritable> {
+ public static final String KMER_SIZE = "P5ForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "P5ForPathMergeVertex.iteration";
+ public static final String RANDSEED = "P5ForPathMergeVertex.randSeed";
+ public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private static long randSeed = -1;
+ private float probBeingRandomHead = -1;
+ private Random randGenerator;
+
+ private PositionWritable curID = new PositionWritable();
+ private PositionWritable nextID = new PositionWritable();
+ private PositionWritable prevID = new PositionWritable();
+ private boolean hasNext;
+ private boolean hasPrev;
+ private boolean curHead;
+ private boolean nextHead;
+ private boolean prevHead;
+ private byte headFlag;
+ private byte tailFlag;
+ private byte outFlag;
+
+ private MessageWritable incomingMsg = new MessageWritable();
+ private MessageWritable outgoingMsg = new MessageWritable();
+ private PositionWritable destVertexId = new PositionWritable();
+ private Iterator<PositionWritable> posIterator;
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if (randSeed < 0)
+ randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ randGenerator = new Random(randSeed);
+ if (probBeingRandomHead < 0)
+ probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
+ hasNext = false;
+ hasPrev = false;
+ curHead = false;
+ nextHead = false;
+ prevHead = false;
+ outgoingMsg.reset();
+ }
+
+ protected boolean isNodeRandomHead(PositionWritable nodeID) {
+ // "deterministically random", based on node id
+ randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ return randGenerator.nextFloat() < probBeingRandomHead;
+ }
+
+ /**
+ * set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
+ */
+ protected boolean setNextInfo(VertexValueWritable value) {
+ if (value.getFFList().getCountOfPosition() > 0) {
+ nextID.set(value.getFFList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ if (value.getFRList().getCountOfPosition() > 0) {
+ nextID.set(value.getFRList().getPosition(0));
+ nextHead = isNodeRandomHead(nextID);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
+ */
+ protected boolean setPrevInfo(VertexValueWritable value) {
+ if (value.getRRList().getCountOfPosition() > 0) {
+ prevID.set(value.getRRList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ if (value.getRFList().getCountOfPosition() > 0) {
+ prevID.set(value.getRFList().getPosition(0));
+ prevHead = isNodeRandomHead(prevID);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * get destination vertex
+ */
+ public PositionWritable getNextDestVertexId(VertexValueWritable value) {
+ if(value.getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ posIterator = value.getFFList().iterator();
+ else // #FRList() > 0
+ posIterator = value.getFRList().iterator();
+ return posIterator.next();
+ }
+
+ public PositionWritable getPreDestVertexId(VertexValueWritable value) {
+ if(value.getRFList().getCountOfPosition() > 0) // #RFList() > 0
+ posIterator = value.getRFList().iterator();
+ else // #RRList() > 0
+ posIterator = value.getRRList().iterator();
+ return posIterator.next();
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(VertexValueWritable value) {
+ posIterator = value.getFFList().iterator(); // FFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getFRList().iterator(); // FRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(VertexValueWritable value) {
+ posIterator = value.getRFList().iterator(); // RFList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ posIterator = value.getRRList().iterator(); // RRList
+ while(posIterator.hasNext()){
+ destVertexId.set(posIterator.next());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+
+ /**
+ * start sending message
+ */
+ public void startSendMsg() {
+ if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.START);
+ sendMsgToAllNextNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
+ outgoingMsg.setFlag(Message.END);
+ sendMsgToAllPreviousNodes(getVertexValue());
+ voteToHalt();
+ }
+ if (VertexUtil.isHeadWithoutIndegree(getVertexValue())){
+ outgoingMsg.setFlag(Message.START);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
+ outgoingMsg.setFlag(Message.END);
+ sendMsg(getVertexId(), outgoingMsg); //send to itself
+ voteToHalt();
+ }
+ }
+
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<MessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!VertexUtil.isPathVertex(getVertexValue())
+ && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+ && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+ msgIterator.next();
+ voteToHalt();
+ } else {
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState() {
+ if (incomingMsg.getFlag() == Message.START) {
+ getVertexValue().setState(MessageFlag.IS_HEAD); //State.START_VERTEX
+ } else if (incomingMsg.getFlag() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ getVertexValue().setState(MessageFlag.IS_TAIL);
+ getVertexValue().setKmer(getVertexValue().getKmer());
+ //voteToHalt();
+ } //else
+ //voteToHalt();
+ }
+
+ /**
+ * check if A need to be flipped with successor
+ */
+ public boolean ifFilpWithSuccessor(){
+ if(getVertexValue().getFRList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * check if A need to be filpped with predecessor
+ */
+ public boolean ifFlipWithPredecessor(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * set adjMessage to successor(from predecessor)
+ */
+ public void setSuccessorAdjMsg(){
+ if(getVertexValue().getFFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_FF;
+ else
+ outFlag |= MessageFlag.DIR_FR;
+ }
+
+ /**
+ * set adjMessage to predecessor(from successor)
+ */
+ public void setPredecessorAdjMsg(){
+ if(getVertexValue().getRFList().getLength() > 0)
+ outFlag |= MessageFlag.DIR_RF;
+ else
+ outFlag |= MessageFlag.DIR_RF;
+ }
+
+ /**
+ * send update message to neighber
+ * @throws IOException
+ */
+ public void broadcastUpdateMsg(){
+ switch(getVertexValue().getState() & 0b0001){
+ case MessageFlag.SHOULD_MERGEWITHPREV:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ case MessageFlag.SHOULD_MERGEWITHNEXT:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outFlag |= MessageFlag.FLIP;
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ break;
+ }
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromPredecessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHNEXT;
+ getVertexValue().setState(state);
+ if(getVertexValue().getFFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getFFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getFRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * This vertex tries to merge with next vertex and send update msg to neighber
+ * @throws IOException
+ */
+ public void sendUpMsgFromSuccessor(){
+ byte state = getVertexValue().getState();
+ state |= MessageFlag.SHOULD_MERGEWITHPREV;
+ getVertexValue().setState(state);
+ if(getVertexValue().getRFList().getLength() > 0)
+ getVertexValue().setMergeDest(getVertexValue().getRFList().getPosition(0));
+ else
+ getVertexValue().setMergeDest(getVertexValue().getRRList().getPosition(0));
+ broadcastUpdateMsg();
+ }
+
+ /**
+ * Returns the edge dir for B->A when the A->B edge is type @dir
+ */
+ public byte mirrorDirection(byte dir) {
+ switch (dir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RF;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_FF;
+ default:
+ throw new RuntimeException("Unrecognized direction in flipDirection: " + dir);
+ }
+ }
+
+ /**
+ * check if need filp
+ */
+ public byte flipDirection(byte neighborDir, boolean flip){
+ if(flip){
+ switch (neighborDir) {
+ case MessageFlag.DIR_FF:
+ return MessageFlag.DIR_FR;
+ case MessageFlag.DIR_FR:
+ return MessageFlag.DIR_FF;
+ case MessageFlag.DIR_RF:
+ return MessageFlag.DIR_RR;
+ case MessageFlag.DIR_RR:
+ return MessageFlag.DIR_RF;
+ default:
+ throw new RuntimeException("Unrecognized direction for neighborDir: " + neighborDir);
+ }
+ } else
+ return neighborDir;
+ }
+
+ /**
+ * updateAdjList
+ */
+ public void processUpdate(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()));
+ }
+
+ /**
+ * merge and updateAdjList
+ */
+ public void processMerge(){
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ boolean flip;
+ if((outFlag & MessageFlag.FLIP) > 0)
+ flip = true;
+ else
+ flip = false;
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, flip);
+
+ getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
+ kmerSize, incomingMsg.getKmer());
+ }
+
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if (getSuperstep() == 2)
+ initState(msgIterator);
+ else if (getSuperstep() % 4 == 3){
+ // Node may be marked as head b/c it's a real head or a real tail
+ headFlag = (byte) (State.START_VERTEX & getVertexValue().getState());
+ tailFlag = (byte) (State.END_VERTEX & getVertexValue().getState());
+ outFlag = (byte) (headFlag | tailFlag);
+
+ // only PATH vertices are present. Find the ID's for my neighbors
+ curID.set(getVertexId());
+
+ curHead = isNodeRandomHead(curID);
+
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(getVertexValue()) && tailFlag == 0;
+ hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
+ if ((outFlag & MessageFlag.IS_HEAD) > 0 && (outFlag & MessageFlag.IS_TAIL) > 0) {
+ getVertexValue().setState(outFlag);
+ voteToHalt();
+ }
+ if (hasNext || hasPrev) {
+ if (curHead) {
+ if (hasNext && !nextHead) {
+ // compress this head to the forward tail
+ sendUpMsgFromPredecessor();
+ } else if (hasPrev && !prevHead) {
+ // compress this head to the reverse tail
+ sendUpMsgFromSuccessor();
+ }
+ } else {
+ // I'm a tail
+ if (hasNext && hasPrev) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ // tails on both sides, and I'm the "local minimum"
+ // compress me towards the tail in forward dir
+ sendUpMsgFromPredecessor();
+ }
+ } else if (!hasPrev) {
+ // no previous node
+ if (!nextHead && curID.compareTo(nextID) < 0) {
+ // merge towards tail in forward dir
+ sendUpMsgFromPredecessor();
+ }
+ } else if (!hasNext) {
+ // no next node
+ if (!prevHead && curID.compareTo(prevID) < 0) {
+ // merge towards tail in reverse dir
+ sendUpMsgFromSuccessor();
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(P5ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P5ForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
index 59a5efb..668633c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipAddVertex.java
@@ -85,7 +85,7 @@
byte[] array = { 'G', 'A', 'A'};
KmerBytesWritable kmer = new KmerBytesWritable(array.length);
kmer.setByRead(array, 0);
- vertexValue.setMergeChain(kmer);
+ vertexValue.setKmer(kmer);
PositionListWritable plist = new PositionListWritable();
plist.append(new PositionWritable(1, (byte)4));
vertexValue.setRRList(plist);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 61e7ae9..dec2d48 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -95,11 +95,11 @@
initVertex();
if(getSuperstep() == 1){
if(VertexUtil.isIncomingTipVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length){
if(getVertexValue().getFFList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMFF);
+ outgoingMsg.setFlag(AdjMessage.FROMFF);
else if(getVertexValue().getFRList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMFR);
+ outgoingMsg.setFlag(AdjMessage.FROMFR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getNextDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -107,11 +107,11 @@
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() <= length){
+ if(getVertexValue().getLengthOfKmer() <= length){
if(getVertexValue().getRFList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMRF);
+ outgoingMsg.setFlag(AdjMessage.FROMRF);
else if(getVertexValue().getRRList().getCountOfPosition() > 0)
- outgoingMsg.setMessage(AdjMessage.FROMRR);
+ outgoingMsg.setFlag(AdjMessage.FROMRR);
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getPreDestVertexId(getVertexValue()));
sendMsg(destVertexId, outgoingMsg);
@@ -119,14 +119,14 @@
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getLengthOfMergeChain() > length)
+ if(getVertexValue().getLengthOfKmer() > length)
deleteVertex(getVertexId());
}
}
else if(getSuperstep() == 2){
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() == AdjMessage.FROMFF){
+ if(incomingMsg.getFlag() == AdjMessage.FROMFF){
//remove incomingMsg.getSourceId from RR positionList
iterator = getVertexValue().getRRList().iterator();
while(iterator.hasNext()){
@@ -136,7 +136,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMFR){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMFR){
//remove incomingMsg.getSourceId from RF positionList
iterator = getVertexValue().getRFList().iterator();
while(iterator.hasNext()){
@@ -146,7 +146,7 @@
break;
}
}
- } else if(incomingMsg.getMessage() == AdjMessage.FROMRF){
+ } else if(incomingMsg.getFlag() == AdjMessage.FROMRF){
//remove incomingMsg.getSourceId from FR positionList
iterator = getVertexValue().getFRList().iterator();
while(iterator.hasNext()){
@@ -156,7 +156,7 @@
break;
}
}
- } else{ //incomingMsg.getMessage() == AdjMessage.FROMRR
+ } else{ //incomingMsg.getFlag() == AdjMessage.FROMRR
//remove incomingMsg.getSourceId from FF positionList
iterator = getVertexValue().getFFList().iterator();
while(iterator.hasNext()){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
index 8669ecd..9c20ce6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/ConvertNodeToIdValue.java
@@ -38,7 +38,7 @@
outputValue.setFRList(node.getFRList());
outputValue.setRFList(node.getRFList());
outputValue.setRRList(node.getRRList());
- outputValue.setMergeChain(node.getKmer());
+ outputValue.setKmer(node.getKmer());
outputValue.setState(State.NON_VERTEX);
writer.append(outputKey, outputValue);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index 9dd5162..f31c1d0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -60,7 +60,7 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() != -1 && value.getLengthOfMergeChain() <= maxLength) {
+ if (value.getLengthOfKmer() != -1 && value.getLengthOfKmer() <= maxLength) {
bw.write(value.toString());
bw.newLine();
}
@@ -84,7 +84,7 @@
if (key == null || value == null) {
break;
}
- if (value.getLengthOfMergeChain() != -1 && value.getLengthOfMergeChain() <= maxLength
+ if (value.getLengthOfKmer() != -1 && value.getLengthOfKmer() <= maxLength
&& value.getState() == State.FINAL_VERTEX) {
bw.write(key.toString() + "\t" + value.toString());
bw.newLine();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
index 0a5428b..c130ec6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFlag.java
@@ -1,31 +1,24 @@
package edu.uci.ics.genomix.pregelix.type;
-public class MessageFlag {
- public static final byte FLIP = 0;
- public static final byte IS_HEAD = 1 << 1;
- public static final byte IS_TAIL = 1 << 2;
- public static final byte SHOULD_MERGEWITHNEXT = 1 << 3;
- public static final byte SHOULD_MERGEWITHPREV = 1 << 4;
- public static final byte FROM_SUCCESSOR = 1 << 5;
- public static final byte FROM_PREDECESSOR = 1 << 6;
+import edu.uci.ics.genomix.type.NodeWritable.MergeDirFlag;
+
+public class MessageFlag extends MergeDirFlag{
+
+ public static final byte FLIP = 1 << 3;
+ public static final byte IS_HEAD = 1 << 4;
+ public static final byte IS_TAIL = 1 << 5;
+ public static final byte IS_OLDHEAD = 1 << 6;
+ public static final byte IS_FINAL = 0b000011;
public static String getFlagAsString(byte code) {
// TODO: allow multiple flags to be set
switch (code) {
case IS_HEAD:
return "IS_HEAD";
- case IS_TAIL:
- return "IS_TAIL";
- case SHOULD_MERGEWITHNEXT:
- return "SHOULD_MERGEWITHNEXT";
- case SHOULD_MERGEWITHPREV:
- return "SHOULD_MERGEWITHPREV";
+ case IS_OLDHEAD:
+ return "IS_OLDHEAD";
case FLIP:
return "FLIP";
- case FROM_SUCCESSOR:
- return "FROM_SUCCESSOR";
- case FROM_PREDECESSOR:
- return "FROM_PREDECESSOR";
}
return "ERROR_BAD_MESSAGE";
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
new file mode 100644
index 0000000..f343c2e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.genomix.pregelix.type;
+
+public class MessageFromHead {
+ public static final byte BothMsgsFromHead = 1 << 0;
+ public static final byte BothMsgsFromNonHead = 1 << 1;
+ public static final byte BothMsgsFromOldHead = 1 << 2;
+ public static final byte OneMsgFromHead = 1 << 3;
+ public static final byte OneMsgFromNonHead = 1 << 4;
+ public static final byte OneMsgFromHeadToNonHead = 1 << 5;
+ public static final byte OneMsgFromHeadToHead = 1 << 6;
+
+ public static final byte NO_INFO = 0 << 0;
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 67c9792..e9197db 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -1,7 +1,9 @@
package edu.uci.ics.genomix.pregelix.util;
+import edu.uci.ics.genomix.pregelix.io.AdjacencyListWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
public class VertexUtil {
/**
@@ -14,21 +16,19 @@
}
/**
- * Head Vertex: out-degree > 0,
- *
- * @param vertexValue
+ * Head Vertex: out-degree > 0
*/
- public static boolean isHeadVertex(VertexValueWritable value) {
- return value.outDegree() > 0 && !isPathVertex(value) && !isHeadWithoutIndegree(value);
+ public static boolean isHead(VertexValueWritable value){
+ return value.outDegree() > 0 && !isPathVertex(value);
}
-
+
/**
- * Rear Vertex: in-degree > 0,
+ * Head Vertex: out-degree > 0, and has indegress
*
* @param vertexValue
*/
- public static boolean isRearVertex(VertexValueWritable value) {
- return value.inDegree() > 0 && !isPathVertex(value) && !isRearWithoutOutdegree(value);
+ public static boolean isHeadVertexWithIndegree(VertexValueWritable value) {
+ return isHead(value) && !isHeadWithoutIndegree(value);
}
/**
@@ -39,6 +39,23 @@
}
/**
+ * Head Vertex: out-degree > 0
+ */
+ public static boolean isRear(VertexValueWritable value){
+ return value.inDegree() > 0 && !isPathVertex(value);
+ }
+
+ /**
+ * Rear Vertex: in-degree > 0, and has outdegree
+ *
+ * @param vertexValue
+ */
+ public static boolean isRearVertexWithOutdegree(VertexValueWritable value) {
+ return isRear(value) && !isRearWithoutOutdegree(value);
+ }
+
+
+ /**
* Rear Vertex without outdegree: indegree = 1, outdegree = 0
*/
public static boolean isRearWithoutOutdegree(VertexValueWritable value){
@@ -93,4 +110,16 @@
public static boolean isDownBridgeVertex(VertexValueWritable value){
return value.inDegree() > 1 && value.outDegree() == 1;
}
+
+ /**
+ * get nodeId from Ad
+ */
+ public static PositionWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
+ if(adj.getForwardList().getCountOfPosition() > 0)
+ return adj.getForwardList().getPosition(0);
+ else if(adj.getReverseList().getCountOfPosition() > 0)
+ return adj.getReverseList().getPosition(0);
+ else
+ return null;
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index f52957e..a63da20 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -183,8 +183,8 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
- genP3ForMergeGraph();
+ genLogAlgorithmForMergeGraph();
+ //genP3ForMergeGraph();
//genTipAddGraph();
//genTipRemoveGraph();
//genBridgeAddGraph();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index f86ef23..cd81a97 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,7 +45,7 @@
public static final String PreFix = "data/input"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "read"};/*, PreFix + File.separator
+ + "pathmerge"};/*, PreFix + File.separator
/*+ "CyclePath"};, PreFix + File.separator
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index c5aa0eb..355bc7a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 3;
- private static final int ReadLength = 8;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/tworeads.txt";
+ private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -71,8 +71,8 @@
@Test
public void TestAll() throws Exception {
- TestEndToEnd();
- //TestUnMergedNode();
+ //TestEndToEnd();
+ TestUnMergedNode();
}
public void TestEndToEnd() throws Exception {
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
deleted file mode 100644
index 6cfeda1..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
+++ /dev/null
@@ -1,144 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>P3ForPathMergeVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>P3ForPathMergeVertex.pseudoRate</name><value>0.3</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.ValueStateWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>P3ForMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>P3ForPathMergeVertex.maxRound</name><value>2</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file