Merge branch 'genomix/fullstack_genomix' into nanzhang/hyracks_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 95bcaf8..c3e53e8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -467,7 +467,8 @@
posnInByte = 0;
}
}
- bytes[offset] = cacheByte;
+ if(posnInByte > 0)
+ bytes[offset] = cacheByte;
clearLeadBit();
}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
index b4361ac..fbfbeeb 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerBytesWritableTest.java
@@ -229,14 +229,34 @@
merge.mergeWithRFKmer(i, kmer2);
Assert.assertEquals("GGCACAACAACCC", merge.toString());
- String test1 = "CTA";
- String test2 = "AGA";
+ String test1;
+ String test2;
+ test1 = "CTA";
+ test2 = "AGA";
KmerBytesWritable k1 = new KmerBytesWritable(3);
KmerBytesWritable k2 = new KmerBytesWritable(3);
k1.setByRead(test1.getBytes(), 0);
k2.setByRead(test2.getBytes(), 0);
k1.mergeWithRFKmer(3, k2);
- Assert.assertEquals("CTAT", k1);
+ Assert.assertEquals("TCTA", k1.toString());
+
+ test1 = "CTA";
+ test2 = "ATA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("CTAT", k1.toString());
+
+ test1 = "ATA";
+ test2 = "CTA"; //TAT
+ k1 = new KmerBytesWritable(3);
+ k2 = new KmerBytesWritable(3);
+ k1.setByRead(test1.getBytes(), 0);
+ k2.setByRead(test2.getBytes(), 0);
+ k1.mergeWithFRKmer(3, k2);
+ Assert.assertEquals("ATAG", k1.toString());
}
@@ -281,5 +301,55 @@
}
}
}
-
+
+ @Test
+ public void TestFinalMerge() {
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ KmerBytesWritable kmer = new KmerBytesWritable();
+ int kmerSize = 3;
+
+ String F1 = "AATAG";
+ String F2 = "TAGAA";
+ String R1 = "CTATT";
+ String R2 = "TTCTA";
+
+ //FF test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = F2;
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //FR test
+ selfString = F1;
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(R2);
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RF test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(F2);
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+
+ //RR test
+ selfString = R1;
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = R2;
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+ }
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
new file mode 100644
index 0000000..0308913
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/P2PathMergeOutputFormat.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.genomix.pregelix.format;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+public class P2PathMergeOutputFormat extends
+ BinaryDataCleanVertexOutputFormat<KmerBytesWritable, VertexValueWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> createVertexWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ RecordWriter<KmerBytesWritable, VertexValueWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+ return new BinaryLoadGraphVertexWriter(recordWriter);
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+ */
+ public static class BinaryLoadGraphVertexWriter extends
+ BinaryVertexWriter<KmerBytesWritable, VertexValueWritable, NullWritable> {
+ public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, VertexValueWritable> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<KmerBytesWritable, VertexValueWritable, NullWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ byte selfFlag = (byte)(vertex.getVertexValue().getState() & State.VERTEX_MASK);
+ if(selfFlag == State.IS_FINAL)
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ }
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index 1c5f325..6356715 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -22,6 +22,7 @@
private byte flag;
private boolean isFlip;
private int kmerlength = 0;
+ private boolean updateMsg = false;
private byte checkMessage;
@@ -61,6 +62,7 @@
}
checkMessage |= CheckMessage.ADJMSG;
this.flag = msg.getFlag();
+ updateMsg = msg.isUpdateMsg();
}
public void set(int kmerlength, KmerBytesWritable sourceVertexId, KmerBytesWritable chainVertexId, AdjacencyListWritable neighberNode, byte message) {
@@ -82,17 +84,16 @@
}
public void reset() {
- checkMessage = 0;
- kmer.reset(1);
- neighberNode.reset();
- flag = Message.NON;
+ reset(0);
}
public void reset(int kmerSize) {
- checkMessage = 0;
+ checkMessage = (byte) 0;
+ kmerlength = kmerSize;
kmer.reset(1);
neighberNode.reset(kmerSize);
flag = Message.NON;
+ isFlip = false;
}
public KmerBytesWritable getSourceVertexId() {
@@ -148,6 +149,15 @@
this.isFlip = isFlip;
}
+
+ public boolean isUpdateMsg() {
+ return updateMsg;
+ }
+
+ public void setUpdateMsg(boolean updateMsg) {
+ this.updateMsg = updateMsg;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(kmerlength);
@@ -160,6 +170,7 @@
neighberNode.write(out);
out.writeBoolean(isFlip);
out.writeByte(flag);
+ out.writeBoolean(updateMsg);
}
@Override
@@ -175,6 +186,7 @@
neighberNode.readFields(in);
isFlip = in.readBoolean();
flag = in.readByte();
+ updateMsg = in.readBoolean();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 5d06234..065bfd5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -32,7 +32,7 @@
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
public static final byte SHOULD_MERGE_MASK = 0b11 << 3;
- public static final byte SHOULD_MERGE_CLEAR = 0b1110011;
+ public static final byte SHOULD_MERGE_CLEAR = 0b1100111;
}
private PositionListWritable nodeIdList;
@@ -237,6 +237,26 @@
}
/*
+ * Delete the corresponding edge
+ */
+ public void processDelete(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete){
+ switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
+ case MessageFlag.DIR_FF:
+ this.getFFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_FR:
+ this.getFRList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RF:
+ this.getRFList().remove(nodeToDelete);
+ break;
+ case MessageFlag.DIR_RR:
+ this.getRRList().remove(nodeToDelete);
+ break;
+ }
+ }
+
+ /*
* Process any changes to value. This is for edge updates
*/
public void processUpdates(byte neighborToDeleteDir, KmerBytesWritable nodeToDelete,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 937fa42..ec608c5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -13,6 +13,7 @@
import edu.uci.ics.genomix.pregelix.type.AdjMessage;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+import edu.uci.ics.genomix.type.GeneCode;
import edu.uci.ics.genomix.type.KmerBytesWritable;
/**
@@ -25,11 +26,11 @@
public static int kmerSize = -1;
protected int maxIteration = -1;
- protected MessageWritable incomingMsg = null; // = new MessageWritable();
- protected MessageWritable outgoingMsg = null; // = new MessageWritable();
+ protected MessageWritable incomingMsg = null;
+ protected MessageWritable outgoingMsg = null;
protected KmerBytesWritable destVertexId = new KmerBytesWritable();
protected Iterator<KmerBytesWritable> posIterator;
- private KmerBytesWritable kmer = new KmerBytesWritable();
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
byte headFlag;
protected byte outFlag;
protected byte inFlag;
@@ -96,10 +97,12 @@
public KmerBytesWritable getNextDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FF;
return posIterator.next();
} else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
posIterator = value.getFRList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_FR;
return posIterator.next();
} else {
@@ -111,10 +114,12 @@
public KmerBytesWritable getPreDestVertexIdAndSetFlag(VertexValueWritable value) {
if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RF;
return posIterator.next();
} else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
posIterator = value.getRRList().iterator();
+ outFlag &= MessageFlag.DIR_CLEAR;
outFlag |= MessageFlag.DIR_RR;
return posIterator.next();
} else {
@@ -290,6 +295,7 @@
* set adjMessage to successor(from predecessor)
*/
public void setSuccessorAdjMsg(){
+ outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getFFList().getLength() > 0)
outFlag |= MessageFlag.DIR_FF;
else if(getVertexValue().getFRList().getLength() > 0)
@@ -302,6 +308,7 @@
* set adjMessage to predecessor(from successor)
*/
public void setPredecessorAdjMsg(){
+ outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getRFList().getLength() > 0)
outFlag |= MessageFlag.DIR_RF;
else if(getVertexValue().getRRList().getLength() > 0)
@@ -342,18 +349,38 @@
}
/**
+ * send update message to neighber for P2
+ * @throws IOException
+ */
+ public void sendUpdateMsg(){
+ outgoingMsg.setUpdateMsg(true);
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ switch(meToNeighborDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ sendUpdateMsgToPredecessor();
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ sendUpdateMsgToSuccessor();
+ break;
+ }
+ }
+
+ /**
* send merge message to neighber for P2
* @throws IOException
*/
public void sendMergeMsg(){
- if(selfFlag == MessageFlag.IS_HEAD){
+ outgoingMsg.setUpdateMsg(false);
+ if(selfFlag == State.IS_HEAD){
byte newState = getVertexValue().getState();
newState &= ~State.IS_HEAD;
newState |= State.IS_OLDHEAD;
getVertexValue().setState(newState);
resetSelfFlag();
- outFlag |= MessageFlag.IS_HEAD;
- } else if(selfFlag == MessageFlag.IS_OLDHEAD){
+ outFlag |= MessageFlag.IS_HEAD;
+ } else if(selfFlag == State.IS_OLDHEAD){
outFlag |= MessageFlag.IS_OLDHEAD;
voteToHalt();
}
@@ -387,6 +414,46 @@
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
break;
}
+// if(headBecomeOldHead)
+// getVertexValue().processDelete(neighborToMeDir, incomingMsg.getSourceVertexId());
+ }
+
+ /**
+ * send final merge message to neighber for P2
+ * @throws IOException
+ */
+ public void sendFinalMergeMsg(){
+ outFlag |= MessageFlag.IS_FINAL;
+ byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ case MessageFlag.DIR_FR:
+ setSuccessorAdjMsg();
+ if(ifFlipWithPredecessor())
+ outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getNextDestVertexId(getVertexValue())
+ break;
+ case MessageFlag.DIR_RF:
+ case MessageFlag.DIR_RR:
+ setPredecessorAdjMsg();
+ if(ifFilpWithSuccessor())
+ outgoingMsg.setFlip(true);
+ else
+ outgoingMsg.setFlip(false);
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ outgoingMsg.setChainVertexId(getVertexValue().getKmer());
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg); //getPreDestVertexId(getVertexValue())
+ break;
+ }
}
/**
@@ -428,6 +495,7 @@
public void setStateAsMergeWithNext(){
byte state = getVertexValue().getState();
+ state &= State.SHOULD_MERGE_CLEAR;
state |= State.SHOULD_MERGEWITHNEXT;
getVertexValue().setState(state);
}
@@ -445,6 +513,7 @@
public void setStateAsMergeWithPrev(){
byte state = getVertexValue().getState();
+ state &= State.SHOULD_MERGE_CLEAR;
state |= State.SHOULD_MERGEWITHPREV;
getVertexValue().setState(state);
}
@@ -551,7 +620,7 @@
byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
@@ -559,6 +628,60 @@
}
/**
+ * final merge and updateAdjList having parameter for p2
+ */
+ public void processFinalMerge(MessageWritable msg){
+ byte meToNeighborDir = (byte) (msg.getFlag() & MessageFlag.DIR_MASK);
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
+
+ String selfString;
+ String match;
+ String msgString;
+ int index;
+ switch(neighborToMeDir){
+ case MessageFlag.DIR_FF:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = msg.getKmer().toString();
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByRead(msgString.substring(index).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_FR:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(selfString.length() - kmerSize + 1,selfString.length());
+ msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ index = msgString.indexOf(match);
+ kmer.reset(msgString.length() - index);
+ kmer.setByReadReverse(msgString.substring(index).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_RF:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = GeneCode.reverseComplement(msg.getKmer().toString());
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByReadReverse(msgString.substring(0, index + 1).getBytes(), 0);
+ break;
+ case MessageFlag.DIR_RR:
+ selfString = getVertexValue().getKmer().toString();
+ match = selfString.substring(0,kmerSize - 1);
+ msgString = msg.getKmer().toString();
+ index = msgString.lastIndexOf(match) + kmerSize - 2;
+ kmer.reset(index + 1);
+ kmer.setByRead(msgString.substring(0, index + 1).getBytes(), 0);
+ System.out.println(kmer.toString());
+ break;
+ }
+
+ getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
+ neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
+ kmerSize, kmer);
+ }
+
+ /**
* set head state
*/
public void setHeadState(){
@@ -579,12 +702,12 @@
}
/**
- * set final state
+ * set stop flag
*/
public void setStopFlag(){
- byte state = incomingMsg.getFlag();
+ byte state = getVertexValue().getState();
state &= State.VERTEX_CLEAR;
- state |= State.IS_STOP;
+ state |= State.IS_FINAL;
getVertexValue().setState(state);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 7185616..3b5a782 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -4,14 +4,15 @@
import java.util.Iterator;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.oldtype.PositionWritable;
import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
/*
* vertexId: BytesWritable
* vertexValue: VertexValueWritable
@@ -44,7 +45,7 @@
BasicPathMergeVertex {
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
- PositionWritable tempPostition = new PositionWritable();
+ KmerBytesWritable tmpKmer = new KmerBytesWritable();
/**
* initiate kmerSize, maxIteration
@@ -54,9 +55,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
- selfFlag =(byte)(getVertexValue().getState() & MessageFlag.VERTEX_MASK);
- outgoingMsg.reset();
+ headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
+ selfFlag = (byte)(getVertexValue().getState() & State.VERTEX_MASK);
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable(kmerSize);
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable(kmerSize);
+ else
+ outgoingMsg.reset(kmerSize);
receivedMsgList.clear();
}
@@ -65,18 +71,18 @@
*/
public void sendOutMsg() {
//send wantToMerge to next
- tempPostition = getNextDestVertexIdAndSetFlag(getVertexValue());
- if(tempPostition != null){
- destVertexId.set(tempPostition);
+ tmpKmer = getNextDestVertexIdAndSetFlag(getVertexValue());
+ if(tmpKmer != null){
+ destVertexId.set(tmpKmer);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
}
- ////send wantToMerge to prev
- tempPostition = getPreDestVertexIdAndSetFlag(getVertexValue());
- if(tempPostition != null){
- destVertexId.set(tempPostition);
+ //send wantToMerge to prev
+ tmpKmer = getPreDestVertexIdAndSetFlag(getVertexValue());
+ if(tmpKmer != null){
+ destVertexId.set(tmpKmer);
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, outgoingMsg);
@@ -117,7 +123,7 @@
*/
public void sendMsgToPathVertex(Iterator<MessageWritable> msgIterator) {
//send out wantToMerge msg
- if(selfFlag != MessageFlag.IS_HEAD){
+ if(selfFlag != State.IS_HEAD && selfFlag != State.IS_OLDHEAD){
sendOutMsg();
}
}
@@ -126,17 +132,19 @@
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
- if(!msgIterator.hasNext() && selfFlag == MessageFlag.IS_HEAD){
- getVertexValue().setState(MessageFlag.IS_STOP);
+ if(!msgIterator.hasNext() && selfFlag == State.IS_HEAD){
+ outFlag |= MessageFlag.IS_FINAL;
sendOutMsg();
}
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(getMsgFlag() == MessageFlag.IS_FINAL){
processMerge(incomingMsg);
- getVertexValue().setState(MessageFlag.IS_FINAL);
- }else
+ getVertexValue().setState(State.IS_FINAL);
+ }else{
+ sendUpdateMsg();
sendMergeMsg();
+ }
}
}
@@ -148,11 +156,13 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
if(getMsgFlag() == MessageFlag.IS_FINAL){
- setStopFlag();
- sendMergeMsg();
+ sendFinalMergeMsg();
break;
}
- receivedMsgList.add(incomingMsg);
+ if(incomingMsg.isUpdateMsg() && selfFlag == State.IS_OLDHEAD)
+ processUpdate();
+ else if(!incomingMsg.isUpdateMsg())
+ receivedMsgList.add(incomingMsg);
}
if(receivedMsgList.size() != 0){
byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
@@ -160,22 +170,22 @@
case MessageFromHead.BothMsgsFromHead:
case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_FINAL);
+ processFinalMerge(receivedMsgList.get(i)); //processMerge()
+ getVertexValue().setState(State.IS_FINAL);
voteToHalt();
break;
case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
- getVertexValue().setState(MessageFlag.IS_HEAD);
+ processFinalMerge(receivedMsgList.get(i));
+ getVertexValue().setState(State .IS_HEAD);
break;
case MessageFromHead.BothMsgsFromNonHead:
for(int i = 0; i < 2; i++)
- processMerge(receivedMsgList.get(i));
+ processFinalMerge(receivedMsgList.get(i));
break;
case MessageFromHead.NO_MSG:
//halt
- deleteVertex(getVertexId());
+ voteToHalt(); //deleteVertex(getVertexId());
break;
}
}
@@ -188,12 +198,21 @@
else if (getSuperstep() == 2)
initState(msgIterator);
else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
- sendMsgToPathVertex(msgIterator);
- if(selfFlag != MessageFlag.IS_HEAD)
- voteToHalt();
+ if(msgIterator.hasNext()){ //for processing final merge
+ incomingMsg = msgIterator.next();
+ if(getMsgFlag() == MessageFlag.IS_FINAL){
+ setFinalState();
+ processFinalMerge(incomingMsg);
+ }
+ }
+ else{
+ sendMsgToPathVertex(msgIterator);
+ if(selfFlag != State.IS_HEAD)
+ voteToHalt();
+ }
} else if (getSuperstep() % 3 == 1 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- if(selfFlag != MessageFlag.IS_HEAD)
+ if(selfFlag != State.IS_HEAD)
voteToHalt();
} else if (getSuperstep() % 3 == 2 && getSuperstep() <= maxIteration){
processMergeInHeadVertex(msgIterator);
@@ -207,9 +226,9 @@
/**
* BinaryInput and BinaryOutput~/
*/
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(PositionWritable.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(VertexValueWritable.class);
job.setDynamicVertexValueSize(true);
Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 94428ca..c473867 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -78,8 +78,8 @@
incomingMsg = new MessageWritable(kmerSize);
if(outgoingMsg == null)
outgoingMsg = new MessageWritable(kmerSize);
- //if (randSeed < 0)
- // randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ else
+ outgoingMsg.reset(kmerSize);
randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
index 6e1bc4b..bee1dd8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/DirectionFlag.java
@@ -9,4 +9,5 @@
public static final byte DIR_RF = 0b011 << 0;
public static final byte DIR_RR = 0b100 << 0;
public static final byte DIR_MASK = 0b111 << 0;
+ public static final byte DIR_CLEAR = 0b1111000 << 0;
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 0e798fe..e18c31b 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -7,6 +7,7 @@
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
+import edu.uci.ics.genomix.pregelix.format.P2PathMergeOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
@@ -42,21 +43,21 @@
// + "NaiveAlgorithmForMergeGraph.xml");
// }
-// private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
-// job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class); //LogAlgorithmForPathMergeOutputFormat
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genLogAlgorithmForMergeGraph() throws IOException {
-// generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
-// }
+ private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(P2PathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genLogAlgorithmForMergeGraph() throws IOException {
+ generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+ }
//
// private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -77,22 +78,22 @@
// + "P3ForMergeGraph.xml");
// }
- private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(P4ForPathMergeVertex.class);
- job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
- job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(VertexValueWritable.class);
- job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genP4ForMergeGraph() throws IOException {
- generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
- + "P4ForMergeGraph.xml");
- }
+// private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+// PregelixJob job = new PregelixJob(jobName);
+// job.setVertexClass(P4ForPathMergeVertex.class);
+// job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+// job.setDynamicVertexValueSize(true);
+// job.setOutputKeyClass(KmerBytesWritable.class);
+// job.setOutputValueClass(VertexValueWritable.class);
+// job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+// }
+//
+// private static void genP4ForMergeGraph() throws IOException {
+// generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+// + "P4ForMergeGraph.xml");
+// }
// private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
// PregelixJob job = new PregelixJob(jobName);
@@ -198,7 +199,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
+ genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
// genTipRemoveGraph();
@@ -206,7 +207,7 @@
// genBridgeRemoveGraph();
// genBubbleAddGraph();
// genBubbleMergeGraph();
- genP4ForMergeGraph();
+// genP4ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 5aedeb7..1578dfc 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -52,7 +52,7 @@
// + "6", PreFix + File.separator
// + "7", PreFix + File.separator
// + "8", PreFix + File.separator
- + "4"};
+ + "9"};
private static final String ACTUAL_RESULT_DIR = "data/actual/pathmerge";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
deleted file mode 100644
index 597e5c3..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.VertexValueWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>P4ForMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.KmerBytesWritable</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>BasicPathMergeVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
index 3d007d2..5a15ca0 100644
--- a/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
+++ b/genomix/genomix-pregelix/src/test/resources/only_pathmerge.txt
@@ -1 +1 @@
-P4ForMergeGraph.xml
+LogAlgorithmForMergeGraph.xml