P2 pass test1,2,3,4
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
new file mode 100644
index 0000000..0308913
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class P2PathMergeOutputFormat extends
+ BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
+ if(selfFlag == State.IS_FINAL)
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 1c5f325..6356715 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -22,6 +22,7 @@
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
+ private boolean updateMsg = false;
private byte checkMessage;
@@ -61,6 +62,7 @@
}
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
+ updateMsg = msg.isUpdateMsg();
}
public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
@@ -82,17 +84,16 @@
}
public void reset() {
- checkMessage = 0;
- kmer.reset(1);
- neighberNode.reset();
- flag = Message.NON;
+ reset(0);
}
public void reset(int kmerSize) {
- checkMessage = 0;
+ checkMessage = (byte) 0;
+ kmerlength = kmerSize;
kmer.reset(1);
neighberNode.reset(kmerSize);
flag = Message.NON;
+ isFlip = false;
}
public KmerBytesWritable getSourceVertexId() {
@@ -148,6 +149,15 @@
this.isFlip = isFlip;
}
+
+ public boolean isUpdateMsg() {
+ return updateMsg;
+ }
+
+ public void setUpdateMsg(boolean updateMsg) {
+ this.updateMsg = updateMsg;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
@@ -160,6 +170,7 @@
neighberNode.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
+ out.writeBoolean(updateMsg);
}
@Override
@@ -175,6 +186,7 @@
neighberNode.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
+ updateMsg = in.readBoolean();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 5d06234..6d4f683 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -237,6 +237,26 @@
}
/*
+ * Delete the corresponding edge
+ */
+ public void processDelete(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete){
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ }
+
+ /*
* Process any changes to value. This is for edge updates
*/
public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 937fa42..8907491 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -25,8 +25,8 @@
public static int kmerSize = -1;
protected int maxIteration = -1;
- protected MessageWritable incomingMsg = null; // = new MessageWritable();
- protected MessageWritable outgoingMsg = null; // = new MessageWritable();
+ protected MessageWritable incomingMsg = null;
+ protected MessageWritable outgoingMsg = null;
protected KmerBytesWritable destVertexId = new KmerBytesWritable();
protected Iterator<KmerBytesWritable> posIterator;
private KmerBytesWritable kmer = new KmerBytesWritable();
@@ -96,10 +96,12 @@
public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
return posIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
posIterator = value.getFRList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
return posIterator.next();
} else {
@@ -111,10 +113,12 @@
public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
return posIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
posIterator = value.getRRList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
return posIterator.next();
} else {
@@ -290,6 +294,7 @@
* set adjMessage to successor(from predecessor)
*/
public void setSuccessorAdjMsg(){
+ outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getFFList().getLength() > 0)
outFlag |= MessageFlag.DIR_FF;
else if(getVertexValue().getFRList().getLength() > 0)
@@ -302,6 +307,7 @@
* set adjMessage to predecessor(from successor)
*/
public void setPredecessorAdjMsg(){
+ outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getRFList().getLength() > 0)
outFlag |= MessageFlag.DIR_RF;
else if(getVertexValue().getRRList().getLength() > 0)
@@ -342,18 +348,38 @@
}
/**
+ * send update message to neighber for P2
+ * @throws IOException
+ */
+ public void sendUpdateMsg(){
+ outgoingMsg.setUpdateMsg(true);
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ switch(meToNeighborDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ sendUpdateMsgToPredecessor();
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ sendUpdateMsgToSuccessor();
+ break;
+ }
+ }
+
+ /**
* send merge message to neighber for P2
* @throws IOException
*/
public void sendMergeMsg(){
- if(selfFlag == MessageFlag.IS_HEAD){
+ outgoingMsg.setUpdateMsg(false);
+ if(selfFlag == State.IS_HEAD){
byte newState = getVertexValue().getState();
newState &= ~State.IS_HEAD;
newState |= State.IS_OLDHEAD;
getVertexValue().setState(newState);
resetSelfFlag();
- outFlag |= MessageFlag.IS_HEAD;
- } else if(selfFlag == MessageFlag.IS_OLDHEAD){
+ outFlag |= MessageFlag.IS_HEAD;
+ } else if(selfFlag == State.IS_OLDHEAD){
outFlag |= MessageFlag.IS_OLDHEAD;
voteToHalt();
}
@@ -387,6 +413,46 @@
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
+// if(headBecomeOldHead)
+// getVertexValue().processDelete(neighborToMeDir, incomingMsg.getSourceVertexId());
+ }
+
+ /**
+ * send final merge message to neighber for P2
+ * @throws IOException
+ */
+ public void sendFinalMergeMsg(){
+ outFlag |= MessageFlag.IS_FINAL;
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
+ break;
+ }
}
/**
@@ -551,7 +617,7 @@
byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
@@ -579,12 +645,12 @@
}
/**
- * set final state
+ * set stop flag
*/
public void setStopFlag(){
- byte state = incomingMsg.getFlag();
+ byte state = getVertexValue().getState();
state &= State.VERTEX_CLEAR;
- state |= State.IS_STOP;
+ state |= State.IS_FINAL;
getVertexValue().setState(state);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 49bca8e..e2e8933 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -4,12 +4,12 @@
import java.util.Iterator;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
import edu.uci.ics.genomix.type.KmerBytesWritable;
@@ -45,7 +45,7 @@
BasicPathMergeVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- KmerBytesWritable tempPostition = new KmerBytesWritable();
+ KmerBytesWritable tmpKmer = new KmerBytesWritable();
/**
* initiate kmerSize, maxIteration
@@ -55,9 +55,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
- selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
- outgoingMsg.reset();
+ headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
+ selfFlag = (byte)(getVertexValue().getState() & State.VERTEX_MASK);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
receivedMsgList.clear();
}
@@ -66,18 +71,18 @@
*/
public void sendOutMsg() {
//send wantToMerge to next
- tempPostition = getNextDestVertexIdAndSetFlag(getVertexValue());
- if(tempPostition != null){
- destVertexId.set(tempPostition);
+ tmpKmer = getNextDestVertexIdAndSetFlag(getVertexValue());
+ if(tmpKmer != null){
+ destVertexId.set(tmpKmer);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
}
- ////send wantToMerge to prev
- tempPostition = getPreDestVertexIdAndSetFlag(getVertexValue());
- if(tempPostition != null){
- destVertexId.set(tempPostition);
+ //send wantToMerge to prev
+ tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+ if(tmpKmer != null){
+ destVertexId.set(tmpKmer);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
@@ -118,7 +123,7 @@
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
//send out wantToMerge msg
- if(selfFlag != MessageFlag.IS_HEAD){
+ if(selfFlag != State.IS_HEAD && selfFlag != State.IS_OLDHEAD){
sendOutMsg();
}
}
@@ -127,17 +132,19 @@
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
- if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
- getVertexValue().setState(MessageFlag.IS_STOP);
+ if(!msgIterator.hasNext() && selfFlag == State.IS_HEAD){
+ outFlag |= MessageFlag.IS_FINAL;
sendOutMsg();
}
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(getMsgFlag() == MessageFlag.IS_FINAL){
processMerge(incomingMsg);
- getVertexValue().setState(MessageFlag.IS_FINAL);
- }else
+ getVertexValue().setState(State.IS_FINAL);
+ }else{
+ sendUpdateMsg();
sendMergeMsg();
+ }
}
}
@@ -149,11 +156,13 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(getMsgFlag() == MessageFlag.IS_FINAL){
- setStopFlag();
- sendMergeMsg();
+ sendFinalMergeMsg();
break;
}
- receivedMsgList.add(incomingMsg);
+ if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
+ processUpdate();
+ else if(!incomingMsg.isUpdateMsg())
+ receivedMsgList.add(incomingMsg);
}
if(receivedMsgList.size() != 0){
byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
@@ -162,13 +171,13 @@
case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
for(int i = 0; i < 2; i++)
processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_FINAL);
+ getVertexValue().setState(State.IS_FINAL);
voteToHalt();
break;
case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
for(int i = 0; i < 2; i++)
processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_HEAD);
+ getVertexValue().setState(State .IS_HEAD);
break;
case MessageFromHead.BothMsgsFromNonHead:
for(int i = 0; i < 2; i++)
@@ -189,12 +198,21 @@
else if (getSuperstep() == 2)
initState(msgIterator);
else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
- sendMsgToPathVertex(msgIterator);
- if(selfFlag != MessageFlag.IS_HEAD)
- voteToHalt();
+ if(msgIterator.hasNext()){ //for processing final merge
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setFinalState();
+ processMerge(incomingMsg);
+ }
+ }
+ else{
+ sendMsgToPathVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
+ }
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- if(selfFlag != MessageFlag.IS_HEAD)
+ if(selfFlag != State.IS_HEAD)
voteToHalt();
} else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
processMergeInHeadVertex(msgIterator);
@@ -209,8 +227,8 @@
* BinaryInput and BinaryOutput~/
*/
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
- job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 94428ca..c473867 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -78,8 +78,8 @@
incomingMsg = new MessageWritable(kmerSize);
if(outgoingMsg == null)
outgoingMsg = new MessageWritable(kmerSize);
- //if (randSeed < 0)
- // randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ else
+ outgoingMsg.reset(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 6e1bc4b..bee1dd8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -9,4 +9,5 @@
public static final byte DIR_RF = 0b011 << 0;
public static final byte DIR_RR = 0b100 << 0;
public static final byte DIR_MASK = 0b111 << 0;
+ public static final byte DIR_CLEAR = 0b1111000 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 0e798fe..e18c31b 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -7,6 +7,7 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -42,21 +43,21 @@
// + "NaiveAlgorithmForMergeGraph.xml");
// }
-// private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
-// job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genLogAlgorithmForMergeGraph() throws IOException {
-// generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
-// }
+ private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genLogAlgorithmForMergeGraph() throws IOException {
+ generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+ }
//
// private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -77,22 +78,22 @@
// + "P3ForMergeGraph.xml");
// }
- private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(P4ForPathMergeVertex.class);
- job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
- job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genP4ForMergeGraph() throws IOException {
- generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
- + "P4ForMergeGraph.xml");
- }
+// private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(P4ForPathMergeVertex.class);
+// job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(KmerBytesWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genP4ForMergeGraph() throws IOException {
+// generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+// + "P4ForMergeGraph.xml");
+// }
// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -198,7 +199,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -206,7 +207,7 @@
// genBridgeRemoveGraph();
// genBubbleAddGraph();
// genBubbleMergeGraph();
- genP4ForMergeGraph();
+// genP4ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 115f090..5aedeb7 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,14 +45,14 @@
public static final String PreFix = "data/PathMergeTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "2", PreFix + File.separator
- + "3", PreFix + File.separator
- + "4", PreFix + File.separator
- + "5", PreFix + File.separator
- + "6", PreFix + File.separator
- + "7", PreFix + File.separator
- + "8", PreFix + File.separator
- + "9"};
+// + "2", PreFix + File.separator
+// + "3", PreFix + File.separator
+// + "4", PreFix + File.separator
+// + "5", PreFix + File.separator
+// + "6", PreFix + File.separator
+// + "7", PreFix + File.separator
+// + "8", PreFix + File.separator
+ + "4"};
private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
deleted file mode 100644
index 597e5c3..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>P4ForMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.KmerBytesWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
index 3d007d2..5a15ca0 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
@@ -1 +1 @@
-P4ForMergeGraph.xml
+LogAlgorithmForMergeGraph.xml