merge MapReduceVertex to p2 and p2 is completed
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
index 0308913..81c6a33 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -9,6 +9,7 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -39,6 +40,7 @@
byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
if(selfFlag == State.IS_FINAL)
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ LogAlgorithmForPathMergeVertex.fakeVertexExist = false;
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index f1ed1ad..f67cac6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -20,6 +20,9 @@
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
+
+ public static final byte KILL = 0b11 << 3;
+ public static final byte KILL_MASK = 0b11 << 3;
}
public static class VertexStateFlag extends FakeFlag {
@@ -37,6 +40,8 @@
public static class FakeFlag{
public static final byte IS_NONFAKE = 0 << 0;
public static final byte IS_FAKE = 1 << 0;
+
+ public static final byte FAKEFLAG_MASK = (byte) 00000001;
}
private PositionListWritable nodeIdList;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
index 8cb8382..8bddd6b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicGraphCleanVertex.java
@@ -727,24 +727,31 @@
* broadcast kill self to all neighbers Pre-condition: vertex is a path vertex
*/
public void broadcaseKillself(){
+ outFlag = 0;
+ outFlag |= MessageFlag.KILL;
+ outFlag |= MessageFlag.DIR_FROM_DEADVERTEX;
outgoingMsg.setSourceVertexId(getVertexId());
if(getVertexValue().getFFList().getCountOfPosition() > 0){//#FFList() > 0
- outgoingMsg.setFlag(MessageFlag.DIR_FF);
+ outFlag |= MessageFlag.DIR_FF;
+ outgoingMsg.setFlag(outFlag);
sendMsg(getVertexValue().getFFList().getPosition(0), outgoingMsg);
}
else if(getVertexValue().getFRList().getCountOfPosition() > 0){//#FRList() > 0
- outgoingMsg.setFlag(MessageFlag.DIR_FR);
+ outFlag |= MessageFlag.DIR_FR;
+ outgoingMsg.setFlag(outFlag);
sendMsg(getVertexValue().getFRList().getPosition(0), outgoingMsg);
}
if(getVertexValue().getRFList().getCountOfPosition() > 0){//#RFList() > 0
- outgoingMsg.setFlag(MessageFlag.DIR_RF);
+ outFlag |= MessageFlag.DIR_RF;
+ outgoingMsg.setFlag(outFlag);
sendMsg(getVertexValue().getRFList().getPosition(0), outgoingMsg);
}
else if(getVertexValue().getRRList().getCountOfPosition() > 0){//#RRList() > 0
- outgoingMsg.setFlag(MessageFlag.DIR_RR);
+ outFlag |= MessageFlag.DIR_RR;
+ outgoingMsg.setFlag(outFlag);
sendMsg(getVertexValue().getRRList().getPosition(0), outgoingMsg);
}
@@ -754,10 +761,9 @@
/**
* do some remove operations on adjMap after receiving the info about dead Vertex
*/
- public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(incomingMsg.getFlag() == AdjMessage.FROMFF){
+ public void responseToDeadVertex(){
+ switch(incomingMsg.getFlag() & MessageFlag.DIR_MASK){
+ case MessageFlag.DIR_FF:
//remove incomingMsg.getSourceId from RR positionList
posIterator = getVertexValue().getRRList().iterator();
while(posIterator.hasNext()){
@@ -767,7 +773,8 @@
break;
}
}
- } else if(incomingMsg.getFlag() == AdjMessage.FROMFR){
+ break;
+ case MessageFlag.DIR_FR:
//remove incomingMsg.getSourceId from FR positionList
posIterator = getVertexValue().getFRList().iterator();
while(posIterator.hasNext()){
@@ -777,7 +784,8 @@
break;
}
}
- } else if(incomingMsg.getFlag() == AdjMessage.FROMRF){
+ break;
+ case MessageFlag.DIR_RF:
//remove incomingMsg.getSourceId from RF positionList
posIterator = getVertexValue().getRFList().iterator();
while(posIterator.hasNext()){
@@ -787,7 +795,8 @@
break;
}
}
- } else{ //incomingMsg.getFlag() == AdjMessage.FROMRR
+ break;
+ case MessageFlag.DIR_RR:
//remove incomingMsg.getSourceId from FF positionList
posIterator = getVertexValue().getFFList().iterator();
while(posIterator.hasNext()){
@@ -797,10 +806,15 @@
break;
}
}
- }
+ break;
}
}
+ public boolean isKillMsg(){
+ byte killFlag = (byte) (incomingMsg.getFlag() & MessageFlag.KILL_MASK);
+ return killFlag == MessageFlag.KILL;
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index d9cfc8c..4e543ce 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -13,6 +13,7 @@
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
/*
* vertexId: BytesWritable
* vertexValue: VertexValueWritable
@@ -42,11 +43,12 @@
* The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
public class LogAlgorithmForPathMergeVertex extends
- BasicGraphCleanVertex {
+ MapReduceVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
KmerBytesWritable tmpKmer = new KmerBytesWritable();
-
+
+ private boolean isFakeVertex = false;
/**
* initiate kmerSize, maxIteration
*/
@@ -64,6 +66,18 @@
else
outgoingMsg.reset(kmerSize);
receivedMsgList.clear();
+ if(reverseKmer == null)
+ reverseKmer = new KmerBytesWritable(kmerSize);
+ if(kmerList == null)
+ kmerList = new KmerListWritable(kmerSize);
+ else
+ kmerList.reset(kmerSize);
+ if(fakeVertex == null){
+ fakeVertex = new KmerBytesWritable(kmerSize + 1);
+ String random = generaterRandomString(kmerSize + 1);
+ fakeVertex.setByRead(random.getBytes(), 0);
+ }
+ isFakeVertex = ((byte)getVertexValue().getState() & State.FAKEFLAG_MASK) > 0 ? true : false;
}
/**
@@ -138,7 +152,14 @@
}
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- if(getMsgFlag() == MessageFlag.IS_FINAL){
+ /** final Vertex Responses To FakeVertex **/
+ if((byte)(incomingMsg.getFlag() & MessageFlag.KILL_MASK) == MessageFlag.KILL){
+ if((byte)(incomingMsg.getFlag() & MessageFlag.DIR_MASK) == MessageFlag.DIR_FROM_DEADVERTEX){
+ responseToDeadVertex();
+ } else{
+ broadcaseKillself();
+ }
+ }else if(getMsgFlag() == MessageFlag.IS_FINAL){
processMerge(incomingMsg);
getVertexValue().setState(State.IS_FINAL);
}else{
@@ -156,15 +177,24 @@
//process merge when receiving msg
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- /** for final processing **/
- if(getMsgFlag() == MessageFlag.IS_FINAL){
- sendFinalMergeMsg();
- break;
+ /** final Vertex Responses To FakeVertex **/
+ if((byte)(incomingMsg.getFlag() & MessageFlag.KILL_MASK) == MessageFlag.KILL){
+ if((byte)(incomingMsg.getFlag() & MessageFlag.DIR_MASK) == MessageFlag.DIR_FROM_DEADVERTEX){
+ responseToDeadVertex();
+ } else{
+ broadcaseKillself();
+ }
+ } else {
+ /** for final processing **/
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ sendFinalMergeMsg();
+ break;
+ }
+ if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
+ processUpdate();
+ else if(!incomingMsg.isUpdateMsg())
+ receivedMsgList.add(incomingMsg);
}
- if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
- processUpdate();
- else if(!incomingMsg.isUpdateMsg())
- receivedMsgList.add(incomingMsg);
}
if(receivedMsgList.size() != 0){
byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
@@ -174,6 +204,8 @@
for(int i = 0; i < 2; i++)
processFinalMerge(receivedMsgList.get(i)); //processMerge()
getVertexValue().setState(State.IS_FINAL);
+ /** NON-FAKE and Final vertice send msg to FAKE vertex **/
+ sendMsgToFakeVertex();
voteToHalt();
break;
case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
@@ -195,31 +227,63 @@
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
- if (getSuperstep() == 1)
+ if (getSuperstep() == 1){
+ addFakeVertex();
startSendMsg();
- else if (getSuperstep() == 2)
+ }
+ else if (getSuperstep() == 2){
+ if(!msgIterator.hasNext() && isFakeVertex)
+ voteToHalt();
initState(msgIterator);
+ }
else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
- /** for processing final merge **/
- if(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- if(getMsgFlag() == MessageFlag.IS_FINAL){
- setFinalState();
- processFinalMerge(incomingMsg);
+ if(!isFakeVertex){
+ /** for processing final merge **/
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setFinalState();
+ processFinalMerge(incomingMsg);
+ /** NON-FAKE and Final vertice send msg to FAKE vertex **/
+ sendMsgToFakeVertex();
+ } else if(isKillMsg()){
+ responseToDeadVertex();
+ }
+ }
+ /** processing general case **/
+ else{
+ sendMsgToPathVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
}
}
- /** processing general case **/
+ /** Fake vertex agregates message and group them by actual kmer **/
else{
- sendMsgToPathVertex(msgIterator);
- if(selfFlag != State.IS_HEAD)
- voteToHalt();
+ kmerMapper.clear();
+ /** Mapper **/
+ mapKeyByActualKmer(msgIterator);
+ /** Reducer **/
+ reduceKeyByActualKmer();
+ voteToHalt();
}
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
- responseMsgToHeadVertex(msgIterator);
- if(selfFlag != State.IS_HEAD)
+ if(!isFakeVertex){
+ responseMsgToHeadVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
+ }
+ /** Fake vertex agregates message and group them by actual kmer **/
+ else{
+ kmerMapper.clear();
+ /** Mapper **/
+ mapKeyByActualKmer(msgIterator);
+ /** Reducer **/
+ reduceKeyByActualKmer();
voteToHalt();
+ }
} else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
- processMergeInHeadVertex(msgIterator);
+ if(!isFakeVertex)
+ processMergeInHeadVertex(msgIterator);
}else
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index 9d0bb13..a12c583 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -21,12 +21,12 @@
public class MapReduceVertex extends
BasicGraphCleanVertex {
- private static boolean fakeVertexExist = false;
- private static KmerBytesWritable fakeVertex = null;
+ public static boolean fakeVertexExist = false;
+ protected static KmerBytesWritable fakeVertex = null;
- private KmerBytesWritable reverseKmer;
- private KmerListWritable kmerList = null;
- private Map<KmerBytesWritable, KmerListWritable> kmerMapper = new HashMap<KmerBytesWritable, KmerListWritable>();
+ protected KmerBytesWritable reverseKmer;
+ protected KmerListWritable kmerList = null;
+ protected Map<KmerBytesWritable, KmerListWritable> kmerMapper = new HashMap<KmerBytesWritable, KmerListWritable>();
/**
* initiate kmerSize, maxIteration
@@ -69,82 +69,110 @@
return sb.toString();
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ /**
+ * add fake vertex
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void addFakeVertex(){
+ if(!fakeVertexExist){
+ //add a fake vertex
+ Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ VertexValueWritable vertexValue = new VertexValueWritable(kmerSize + 1);
+ vertexValue.setState(State.IS_FAKE);
+ vertexValue.setFakeVertex(true);
+
+ vertex.setVertexId(fakeVertex);
+ vertex.setVertexValue(vertexValue);
+
+ addVertex(fakeVertex, vertex);
+ fakeVertexExist = true;
+ }
+ }
+
+ public void sendMsgToFakeVertex(){
+ if(!getVertexValue().isFakeVertex()){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
+ sendMsg(fakeVertex, outgoingMsg);
+ voteToHalt();
+ }
+ }
+
+ public void mapKeyByActualKmer(Iterator<MessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ String kmerString = incomingMsg.getActualKmer().toString();
+ tmpKmer.reset(kmerString.length());
+ reverseKmer.reset(kmerString.length());
+ tmpKmer.setByRead(kmerString.getBytes(), 0);
+ reverseKmer.setByReadReverse(kmerString.getBytes(), 0);
+
+ if(reverseKmer.compareTo(tmpKmer) < 0)
+ tmpKmer.set(reverseKmer);
+ if(!kmerMapper.containsKey(tmpKmer)){
+ kmerList.reset();
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(tmpKmer, kmerList);
+ } else{
+ kmerList.set(kmerMapper.get(tmpKmer));
+ kmerList.append(incomingMsg.getSourceVertexId());
+ kmerMapper.put(tmpKmer, kmerList);
+ }
+ }
+ }
+
+ public void reduceKeyByActualKmer(){
+ for(KmerBytesWritable key : kmerMapper.keySet()){
+ kmerList = kmerMapper.get(key);
+ for(int i = 1; i < kmerList.getCountOfPosition(); i++){
+ //send kill message
+ outgoingMsg.setFlag(MessageFlag.KILL);
+ destVertexId.set(kmerList.getPosition(i));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
+
+ public void finalVertexResponseToFakeVertex(Iterator<MessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ inFlag = incomingMsg.getFlag();
+ if(inFlag == MessageFlag.KILL){
+ broadcaseKillself();
+ }
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
if(getSuperstep() == 1){
- if(!fakeVertexExist){
- //add a fake vertex
- Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
- vertex.getMsgList().clear();
- vertex.getEdges().clear();
- VertexValueWritable vertexValue = new VertexValueWritable(kmerSize + 1);
- vertexValue.setState(State.IS_FAKE);
- vertexValue.setFakeVertex(true);
-
- vertex.setVertexId(fakeVertex);
- vertex.setVertexValue(vertexValue);
-
- addVertex(fakeVertex, vertex);
- fakeVertexExist = true;
- }
+ addFakeVertex();
}
else if(getSuperstep() == 2){
- if(!getVertexValue().isFakeVertex()){
- outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setAcutalKmer(getVertexValue().getKmer());
- sendMsg(fakeVertex, outgoingMsg);
- voteToHalt();
- }
+ /** NON-FAKE and Final vertice send msg to FAKE vertex **/
+ sendMsgToFakeVertex();
} else if(getSuperstep() == 3){
kmerMapper.clear();
/** Mapper **/
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- String kmerString = incomingMsg.getActualKmer().toString();
- tmpKmer.reset(kmerString.length());
- reverseKmer.reset(kmerString.length());
- tmpKmer.setByRead(kmerString.getBytes(), 0);
- reverseKmer.setByReadReverse(kmerString.getBytes(), 0);
-
- if(reverseKmer.compareTo(tmpKmer) < 0)
- tmpKmer.set(reverseKmer);
- if(!kmerMapper.containsKey(tmpKmer)){
- kmerList.reset();
- kmerList.append(incomingMsg.getSourceVertexId());
- kmerMapper.put(tmpKmer, kmerList);
- } else{
- kmerList.set(kmerMapper.get(tmpKmer));
- kmerList.append(incomingMsg.getSourceVertexId());
- kmerMapper.put(tmpKmer, kmerList);
- }
- }
+ mapKeyByActualKmer(msgIterator);
/** Reducer **/
- for(KmerBytesWritable key : kmerMapper.keySet()){
- kmerList = kmerMapper.get(key);
- for(int i = 1; i < kmerList.getCountOfPosition(); i++){
- //send kill message
- outgoingMsg.setFlag(MessageFlag.KILL);
- destVertexId.set(kmerList.getPosition(i));
- sendMsg(destVertexId, outgoingMsg);
- }
- }
+ reduceKeyByActualKmer();
} else if(getSuperstep() == 4){
/** only for test single MapReduce job**/
if(!msgIterator.hasNext() && getVertexValue().getState() == State.IS_FAKE){
fakeVertexExist = false;
deleteVertex(fakeVertex);
}
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- inFlag = incomingMsg.getFlag();
- if(inFlag == MessageFlag.KILL){
- broadcaseKillself();
- }
- }
+ finalVertexResponseToFakeVertex(msgIterator);
} else if(getSuperstep() == 5){
- responseToDeadVertex(msgIterator);
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(isKillMsg())
+ responseToDeadVertex();
+ }
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index ba8237d..2ed36e9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -11,5 +11,5 @@
public static final byte DIR_MASK = 0b111 << 0;
public static final byte DIR_CLEAR = 0b1111000 << 0;
- public static final byte KILL = 0b111 << 0;
+ public static final byte DIR_FROM_DEADVERTEX = 0b101 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 6e43552..b34c9a9 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -44,21 +44,21 @@
// + "NaiveAlgorithmForMergeGraph.xml");
// }
-// private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
-// job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
-// job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(KmerBytesWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genLogAlgorithmForMergeGraph() throws IOException {
-// generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
-// }
+ private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genLogAlgorithmForMergeGraph() throws IOException {
+ generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+ }
//
// private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -215,7 +215,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
-// genLogAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -224,7 +224,7 @@
// genBubbleAddGraph();
// genBubbleMergeGraph();
// genP4ForMergeGraph();
- genMapReduceGraph();
+// genMapReduceGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 8172fb6..115f090 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -43,7 +43,7 @@
public class PathMergeSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(PathMergeSmallTestSuite.class.getName());
- public static final String PreFix = "data/LogAlgorithmForMergeGraph/bin"; //"graphbuildresult";
+ public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
+ "2", PreFix + File.separator
+ "3", PreFix + File.separator
diff --git a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
index 710a096..5a15ca0 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
@@ -1 +1 @@
-MapReduceGraph.xml
+LogAlgorithmForMergeGraph.xml