add MapReduceVertex for mapper/reducer in pregel
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index 918fa1e..9d15e8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -34,7 +34,7 @@
@Option(name = "-plan", usage = "query plan choice", required = false)
public Plan planChoice = Plan.OUTER_JOIN;
- @Option(name = "-kmer-kmerByteSize", usage = "the kmerByteSize of kmer", required = false)
+ @Option(name = "-tmpKmer-kmerByteSize", usage = "the kmerByteSize of tmpKmer", required = false)
public int sizeKmer;
@Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 6356715..ba374be 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -54,7 +54,7 @@
}
if (kmer != null) {
checkMessage |= CheckMessage.CHAIN;
- this.kmer.set(msg.getKmer());
+ this.kmer.set(msg.getActualKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -107,14 +107,14 @@
}
}
- public KmerBytesWritable getKmer() {
+ public KmerBytesWritable getActualKmer() {
return kmer;
}
- public void setChainVertexId(KmerBytesWritable chainVertexId) {
- if (chainVertexId != null) {
+ public void setAcutalKmer(KmerBytesWritable actualKmer) {
+ if (actualKmer != null) {
checkMessage |= CheckMessage.CHAIN;
- this.kmer.set(chainVertexId);
+ this.kmer.set(actualKmer);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 065bfd5..48a9f52 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -14,7 +14,6 @@
public static class VertexStateFlag {
public static final byte IS_NON = 0b00 << 5;
public static final byte IS_RANDOMTAIL = 0b00 << 5;
- public static final byte IS_STOP = 0b00 << 5;
public static final byte IS_HEAD = 0b01 << 5;
public static final byte IS_FINAL = 0b10 << 5;
public static final byte IS_RANDOMHEAD = 0b11 << 5;
@@ -40,8 +39,8 @@
private AdjacencyListWritable outgoingList;
private byte state;
private KmerBytesWritable kmer;
- private KmerBytesWritable mergeDest;
private int kmerlength = 0;
+ private boolean isFakeVertex = false;
public VertexValueWritable() {
this(0);
@@ -54,7 +53,6 @@
outgoingList = new AdjacencyListWritable();
state = State.IS_NON;
kmer = new KmerBytesWritable(kmerSize);
- mergeDest = new KmerBytesWritable(kmerSize);
}
public VertexValueWritable(PositionListWritable nodeIdList, KmerListWritable forwardForwardList, KmerListWritable forwardReverseList,
@@ -143,6 +141,15 @@
public byte getState() {
return state;
}
+
+
+ public boolean isFakeVertex() {
+ return isFakeVertex;
+ }
+
+ public void setFakeVertex(boolean isFakeVertex) {
+ this.isFakeVertex = isFakeVertex;
+ }
public void setState(byte state) {
this.state = state;
@@ -159,16 +166,7 @@
public void setKmer(KmerBytesWritable kmer) {
this.kmer.set(kmer);
}
-
- public KmerBytesWritable getMergeDest() {
- return mergeDest;
- }
-
- public void setMergeDest(KmerBytesWritable mergeDest) {
- this.mergeDest = mergeDest;
- }
-
-
+
public int getKmerlength() {
return kmerlength;
}
@@ -196,7 +194,7 @@
this.outgoingList.readFields(in);
this.state = in.readByte();
this.kmer.readFields(in);
- this.mergeDest.readFields(in);
+ this.isFakeVertex = in.readBoolean();
}
@Override
@@ -207,7 +205,7 @@
this.outgoingList.write(out);
out.writeByte(this.state);
this.kmer.write(out);
- this.mergeDest.write(out);
+ out.writeBoolean(this.isFakeVertex);
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 5f6440f..82ca03f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -78,7 +78,7 @@
builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getFlag()) + "\r\n");
if (msg.getLengthOfChain() != -1) {
- chain = msg.getKmer().toString();
+ chain = msg.getActualKmer().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 40e3191..b782294 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -150,17 +150,24 @@
public void broadcaseKillself(){
outgoingMsg.setSourceVertexId(getVertexId());
- if(getVertexValue().getFFList().getCountOfPosition() > 0) // #FFList() > 0
+ if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
outgoingMsg.setMessage(AdjMessage.FROMFF);
- else // #FRList() > 0
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
outgoingMsg.setMessage(AdjMessage.FROMFR);
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
- if(getVertexValue().getRFList().getCountOfPosition() > 0) // #RFList() > 0
+
+ if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
outgoingMsg.setMessage(AdjMessage.FROMRF);
- else // #RRList() > 0
+ sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+ }
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
outgoingMsg.setMessage(AdjMessage.FROMRR);
- sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+ sendMsg(incomingMsg.getStartVertexId(), outgoingMsg);
+ }
deleteVertex(getVertexId());
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 169d677..821117b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -30,7 +30,7 @@
protected MessageWritable outgoingMsg = null;
protected KmerBytesWritable destVertexId = new KmerBytesWritable();
protected Iterator<KmerBytesWritable> posIterator;
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+ protected KmerBytesWritable tmpKmer = new KmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -398,7 +398,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -411,7 +411,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
@@ -438,7 +438,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
break;
case MessageFlag.DIR_RF:
@@ -451,7 +451,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
@@ -474,7 +474,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -487,7 +487,7 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
deleteVertex(getVertexId());
break;
@@ -611,7 +611,7 @@
getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getKmer());
+ kmerSize, incomingMsg.getActualKmer());
}
/**
@@ -625,7 +625,7 @@
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
- kmerSize, msg.getKmer());
+ kmerSize, msg.getActualKmer());
}
/**
@@ -645,40 +645,40 @@
case MessageFlag.DIR_FF:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
- msgString = msg.getKmer().toString();
+ msgString = msg.getActualKmer().toString();
index = msgString.indexOf(match);
- kmer.reset(msgString.length() - index);
- kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ tmpKmer.reset(msgString.length() - index);
+ tmpKmer.setByRead(msgString.substring(index).getBytes(), 0);
break;
case MessageFlag.DIR_FR:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
- msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ msgString = GeneCode.reverseComplement(msg.getActualKmer().toString());
index = msgString.indexOf(match);
- kmer.reset(msgString.length() - index);
- kmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+ tmpKmer.reset(msgString.length() - index);
+ tmpKmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
break;
case MessageFlag.DIR_RF:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(0,kmerSize - 1);
- msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ msgString = GeneCode.reverseComplement(msg.getActualKmer().toString());
index = msgString.lastIndexOf(match) + kmerSize - 2;
- kmer.reset(index + 1);
- kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ tmpKmer.reset(index + 1);
+ tmpKmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
break;
case MessageFlag.DIR_RR:
selfString = getVertexValue().getKmer().toString();
match = selfString.substring(0,kmerSize - 1);
- msgString = msg.getKmer().toString();
+ msgString = msg.getActualKmer().toString();
index = msgString.lastIndexOf(match) + kmerSize - 2;
- kmer.reset(index + 1);
- kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ tmpKmer.reset(index + 1);
+ tmpKmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
break;
}
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
- kmerSize, kmer);
+ kmerSize, tmpKmer);
}
/**
@@ -726,6 +726,34 @@
}
/**
+ * broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
+ */
+ public void broadcaseKillself(){
+ outgoingMsg.setSourceVertexId(getVertexId());
+
+ if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
+ outgoingMsg.setFlag(MessageFlag.DIR_FF);
+ sendMsg(getVertexValue().getFFList().getPosition(0), outgoingMsg);
+ }
+ else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
+ outgoingMsg.setFlag(MessageFlag.DIR_FR);
+ sendMsg(getVertexValue().getFRList().getPosition(0), outgoingMsg);
+ }
+
+
+ if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
+ outgoingMsg.setFlag(MessageFlag.DIR_RF);
+ sendMsg(getVertexValue().getRFList().getPosition(0), outgoingMsg);
+ }
+ else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
+ outgoingMsg.setFlag(MessageFlag.DIR_RR);
+ sendMsg(getVertexValue().getRRList().getPosition(0), outgoingMsg);
+ }
+
+ deleteVertex(getVertexId());
+ }
+
+ /**
* do some remove operations on adjMap after receiving the info about dead Vertex
*/
public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
@@ -735,8 +763,8 @@
//remove incomingMsg.getSourceId from RR positionList
posIterator = getVertexValue().getRRList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
@@ -745,8 +773,8 @@
//remove incomingMsg.getSourceId from FR positionList
posIterator = getVertexValue().getFRList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
@@ -755,8 +783,8 @@
//remove incomingMsg.getSourceId from RF positionList
posIterator = getVertexValue().getRFList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
@@ -765,8 +793,8 @@
//remove incomingMsg.getSourceId from FF positionList
posIterator = getVertexValue().getFFList().iterator();
while(posIterator.hasNext()){
- kmer = posIterator.next();
- if(kmer.equals(incomingMsg.getSourceVertexId())){
+ tmpKmer = posIterator.next();
+ if(tmpKmer.equals(incomingMsg.getSourceVertexId())){
posIterator.remove();
break;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
new file mode 100644
index 0000000..0cc198b
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -0,0 +1,98 @@
+package edu.uci.ics.genomix.pregelix.operator.pathmerge;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class MapReduceVertex extends
+ BasicPathMergeVertex {
+
+ KmerListWritable kmerList = new KmerListWritable(kmerSize);
+ Map<KmerBytesWritable, KmerListWritable> kmerMapper = new HashMap<KmerBytesWritable, KmerListWritable>();
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void compute(Iterator<MessageWritable> msgIterator) {
+ if(getSuperstep() == 1){
+ //add a fake vertex
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ KmerBytesWritable vertexId = new KmerBytesWritable();
+ VertexValueWritable vertexValue = new VertexValueWritable();
+ vertexId.setByRead(("XXX").getBytes(), 0);
+ vertexValue.setFakeVertex(true);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(vertexId, vertex);
+ }
+ else if(getSuperstep() == 2){
+ if(!getVertexValue().isFakeVertex()){
+ destVertexId.setByRead(("XXX").getBytes(), 0);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ voteToHalt();
+ } else if(getSuperstep() == 3){
+ kmerMapper.clear();
+ /** Mapper **/
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ tmpKmer.set(incomingMsg.getActualKmer());
+ if(!kmerMapper.containsKey(tmpKmer)){
+ kmerList.reset();
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(tmpKmer, kmerList);
+ } else{
+ kmerList.set(kmerMapper.get(tmpKmer));
+ kmerMapper.put(tmpKmer, kmerList);
+ }
+ }
+ /** Reducer **/
+ for(KmerBytesWritable key : kmerMapper.keySet()){
+ kmerList = kmerMapper.get(key);
+ for(int i = 1; i < kmerList.getCountOfPosition(); i++){
+ //send kill message
+ outgoingMsg.setFlag(MessageFlag.KILL);
+ }
+ }
+ } else if(getSuperstep() == 4){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ inFlag = incomingMsg.getFlag();
+ if(inFlag == MessageFlag.KILL){
+ broadcaseKillself();
+ }
+ }
+ } else if(getSuperstep() == 5){
+ responseToDeadVertex(msgIterator);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(MapReduceVertex.class.getSimpleName());
+ job.setVertexClass(MapReduceVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index a65f556..9d27769 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -185,7 +185,7 @@
public void mergeChainVertex() {
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getKmer()));
+ incomingMsg.getActualKmer()));
getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
@@ -222,7 +222,7 @@
public void responseMsgToHeadVertex() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State.IS_HEAD)//is_tail
outgoingMsg.setFlag(Message.STOP);
destVertexId.set(incomingMsg.getSourceVertexId());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index f2bb3db..d806094 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -231,7 +231,7 @@
*/
public void mergeChainVertex(){
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getKmer()));
+ incomingMsg.getActualKmer()));
getVertexValue().setKmer(
kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
lastKmer));
@@ -271,7 +271,7 @@
public void responseMsgToHeadVertexMergePhase() {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State2.END_VERTEX)
outgoingMsg.setFlag(Message.STOP);
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
@@ -325,7 +325,7 @@
else {
deleteVertex(getVertexId());
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList()); //incomingMsg.getNeighberNode()
- outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
if (getVertexValue().getState() == State2.PSEUDOREAR)
outgoingMsg.setFlag(Message.FROMPSEUDOREAR);
else if (getVertexValue().getState() == State2.END_VERTEX)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index c473867..e3af9bf 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -211,7 +211,7 @@
//send message to the merge object and kill self
broadcastMergeMsg();
} else if (getSuperstep() % 4 == 2){
- //merge kmer
+ //merge tmpKmer
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index bee1dd8..ba8237d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -10,4 +10,6 @@
public static final byte DIR_RR = 0b100 << 0;
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
+
+ public static final byte KILL = 0b111 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index e18c31b..eb17734 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -14,6 +14,7 @@
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
@@ -43,21 +44,21 @@
// + "NaiveAlgorithmForMergeGraph.xml");
// }
- private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
- job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
- job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genLogAlgorithmForMergeGraph() throws IOException {
- generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
- }
+// private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+// job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+// job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(KmerBytesWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genLogAlgorithmForMergeGraph() throws IOException {
+// generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+// }
//
// private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -95,6 +96,21 @@
// + "P4ForMergeGraph.xml");
// }
+ private static void generateMapReduceGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MapReduceVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(MapReduceVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genMapReduceGraph() throws IOException {
+ generateMapReduceGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+ }
// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
// job.setVertexClass(TipAddVertex.class);
@@ -199,7 +215,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
+// genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -208,6 +224,7 @@
// genBubbleAddGraph();
// genBubbleMergeGraph();
// genP4ForMergeGraph();
+ genMapReduceGraph();
}
}