code review with Jake
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index e8b241c..2c4ac0d 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -39,8 +39,9 @@
}
public static class MergeDirFlag extends DirectionFlag{
- public static final byte SHOULD_MERGEWITHNEXT = 0b000 << 0;
- public static final byte SHOULD_MERGEWITHPREV = 0b001 << 0;
+ public static final byte SHOULD_MERGEWITHNEXT = 0b0 << 2;
+ public static final byte SHOULD_MERGEWITHPREV = 0b1 << 2;
+ public static final byte SHOULD_MERGE_MASK = 0b1 << 2;
}
private PositionWritable nodeID;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
index c793609..8620445 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/PositionListWritable.java
@@ -163,10 +163,12 @@
}
public void append(PositionWritable pos) {
- setSize((1 + valueCount) * PositionWritable.LENGTH);
- System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
- * PositionWritable.LENGTH, pos.getLength());
- valueCount += 1;
+ if(pos != null){
+ setSize((1 + valueCount) * PositionWritable.LENGTH);
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), storage, offset + valueCount
+ * PositionWritable.LENGTH, pos.getLength());
+ valueCount += 1;
+ }
}
public void append(int readID, byte posInRead) {
diff --git a/genomix/genomix-pregelix/data/graphbuild.test/read.txt b/genomix/genomix-pregelix/data/graphbuild.test/read.txt
index bd5f3c4..37f02de 100755
--- a/genomix/genomix-pregelix/data/graphbuild.test/read.txt
+++ b/genomix/genomix-pregelix/data/graphbuild.test/read.txt
@@ -1 +1 @@
-1 AATAGAAC
+1 AATAGAA
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
index 4c38255..c4365ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/DataCleanOutputFormat.java
@@ -8,7 +8,6 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryDataCleanVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -36,8 +35,7 @@
@Override
public void writeVertex(Vertex<PositionWritable, VertexValueWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- if(vertex.getVertexValue().getState() != State.END_VERTEX)
- getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
+ getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
index df49879..314dff6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/MessageWritable.java
@@ -19,7 +19,7 @@
* file stores the point to the file that stores the chains of connected DNA
*/
private PositionWritable sourceVertexId;
- private KmerBytesWritable chainVertexId;
+ private KmerBytesWritable kmer;
private AdjacencyListWritable neighberNode; //incoming or outgoing
private byte flag;
@@ -27,7 +27,7 @@
public MessageWritable() {
sourceVertexId = new PositionWritable();
- chainVertexId = new KmerBytesWritable(0);
+ kmer = new KmerBytesWritable(0);
neighberNode = new AdjacencyListWritable();
flag = Message.NON;
checkMessage = (byte) 0;
@@ -39,9 +39,9 @@
checkMessage |= CheckMessage.SOURCE;
this.sourceVertexId.set(msg.getSourceVertexId());
}
- if (chainVertexId != null) {
+ if (kmer != null) {
checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(msg.getChainVertexId());
+ this.kmer.set(msg.getKmer());
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -59,7 +59,7 @@
}
if (chainVertexId != null) {
checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
+ this.kmer.set(chainVertexId);
}
if (neighberNode != null) {
checkMessage |= CheckMessage.NEIGHBER;
@@ -70,7 +70,7 @@
public void reset() {
checkMessage = 0;
- chainVertexId.reset(1);
+ kmer.reset(1);
neighberNode.reset();
flag = Message.NON;
}
@@ -86,14 +86,14 @@
}
}
- public KmerBytesWritable getChainVertexId() {
- return chainVertexId;
+ public KmerBytesWritable getKmer() {
+ return kmer;
}
public void setChainVertexId(KmerBytesWritable chainVertexId) {
if (chainVertexId != null) {
checkMessage |= CheckMessage.CHAIN;
- this.chainVertexId.set(chainVertexId);
+ this.kmer.set(chainVertexId);
}
}
@@ -109,7 +109,7 @@
}
public int getLengthOfChain() {
- return chainVertexId.getKmerLength();
+ return kmer.getKmerLength();
}
public byte getFlag() {
@@ -126,7 +126,7 @@
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.write(out);
if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.write(out);
+ kmer.write(out);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.write(out);
out.write(flag);
@@ -139,7 +139,7 @@
if ((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
if ((checkMessage & CheckMessage.CHAIN) != 0)
- chainVertexId.readFields(in);
+ kmer.readFields(in);
if ((checkMessage & CheckMessage.NEIGHBER) != 0)
neighberNode.readFields(in);
flag = in.readByte();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 104fe84..2defa0c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -4,7 +4,7 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.hadoop.pmcommon.NodeWithFlagWritable.MessageFlag;
+import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.PositionListWritable;
@@ -174,6 +174,10 @@
*/
public void processUpdates(byte neighborToDeleteDir, PositionWritable nodeToDelete,
byte neighborToMergeDir, PositionWritable nodeToAdd){
+// TODO
+// this.getListFromDir(neighborToDeleteDir).remove(nodeToDelete);
+// this.getListFromDir(neighborToMergeDir).append(nodeToDelete);
+
switch (neighborToDeleteDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getFFList().remove(nodeToDelete);
@@ -224,6 +228,9 @@
this.getRRList().remove(nodeToDelete);
break;
}
+ // TODO: remove switch below and replace with general direction merge
+// this.getKmer().mergeWithDirKmer(neighborToMergeDir);
+
switch (neighborToMergeDir & MessageFlag.DIR_MASK) {
case MessageFlag.DIR_FF:
this.getKmer().mergeWithFFKmer(kmerSize, kmer);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index 0e4ff95..c29e1b3 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -78,7 +78,7 @@
builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getFlag()) + "\r\n");
if (msg.getLengthOfChain() != -1) {
- chain = msg.getChainVertexId().toString();
+ chain = msg.getKmer().toString();
builder.append("Chain Message: " + chain + "\r\n");
builder.append("Chain Length: " + msg.getLengthOfChain() + "\r\n");
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 8e234e9..9238f9d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -27,6 +27,7 @@
protected MessageWritable outgoingMsg = new MessageWritable();
protected PositionWritable destVertexId = new PositionWritable();
protected Iterator<PositionWritable> posIterator;
+ byte headFlag;
protected byte outFlag;
/**
@@ -42,6 +43,13 @@
}
/**
+ * reset headFlag
+ */
+ public void resetHeadFlag(){
+ headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
+ }
+
+ /**
* get destination vertex
*/
public PositionWritable getNextDestVertexId(VertexValueWritable value) {
@@ -205,7 +213,7 @@
if(getVertexValue().getRFList().getLength() > 0)
outFlag |= MessageFlag.DIR_RF;
else
- outFlag |= MessageFlag.DIR_RF;
+ outFlag |= MessageFlag.DIR_RR;
}
/**
@@ -248,8 +256,10 @@
newState |= MessageFlag.IS_OLDHEAD;
getVertexValue().setState(newState);
outFlag |= MessageFlag.IS_HEAD;
+ voteToHalt();
} else if((getVertexValue().getState() & MessageFlag.IS_OLDHEAD) > 0){
outFlag |= MessageFlag.IS_OLDHEAD;
+ voteToHalt();
}
byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
@@ -286,7 +296,7 @@
public void broadcastMergeMsg(){
if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & 0b0001){
+ switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK) {
case MessageFlag.SHOULD_MERGEWITHNEXT:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
@@ -314,7 +324,7 @@
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
- public void sendUpMsgFromPredecessor(){
+ public void sendUpMsgToPredecessor(){
byte state = getVertexValue().getState();
state |= MessageFlag.SHOULD_MERGEWITHNEXT;
getVertexValue().setState(state);
@@ -329,7 +339,7 @@
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
- public void sendUpMsgFromSuccessor(){
+ public void sendUpMsgToSuccessor(){
byte state = getVertexValue().getState();
state |= MessageFlag.SHOULD_MERGEWITHPREV;
getVertexValue().setState(state);
@@ -398,7 +408,7 @@
}
/**
- * merge and updateAdjList
+ * merge and updateAdjList merge with one neighbor
*/
public void processMerge(){
byte meToNeighborDir = (byte) (incomingMsg.getFlag() & MessageFlag.DIR_MASK);
@@ -413,7 +423,7 @@
getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getChainVertexId());
+ kmerSize, incomingMsg.getKmer());
}
/**
@@ -432,7 +442,7 @@
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(msg.getNeighberNode()),
- kmerSize, msg.getChainVertexId());
+ kmerSize, msg.getKmer());
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
index 4f1cefb..429282b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/LogAlgorithmForPathMergeVertex.java
@@ -11,10 +11,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
import edu.uci.ics.genomix.type.PositionWritable;
/*
* vertexId: BytesWritable
@@ -47,12 +43,11 @@
public class LogAlgorithmForPathMergeVertex extends
BasicPathMergeVertex {
- private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(1);
- private KmerBytesWritable lastKmer = new KmerBytesWritable(1);
private ArrayList<MessageWritable> receivedMsgList = new ArrayList<MessageWritable>();
+ protected MessageWritable outgoingMsg2 = new MessageWritable();
+ protected PositionWritable destVertexId2 = new PositionWritable();
- byte headFlag;
- byte oldHeadFlag;
+ byte finalFlag;
/**
* initiate kmerSize, maxIteration
*/
@@ -61,7 +56,7 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- headFlag = (byte)0;
+ headFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_HEAD);
outgoingMsg.reset();
}
@@ -76,10 +71,10 @@
sendMsg(destVertexId, outgoingMsg);
////send wantToMerge to prev
- destVertexId.set(getPreDestVertexId(getVertexValue()));
- outgoingMsg.setFlag(outFlag);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, outgoingMsg);
+ destVertexId2.set(getPreDestVertexId(getVertexValue()));
+ outgoingMsg2.setFlag(outFlag);
+ outgoingMsg2.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId2, outgoingMsg2);
}
/**
@@ -89,9 +84,9 @@
int countHead = 0;
int countOldHead = 0;
for(int i = 0; i < receivedMsgList.size(); i++){
- if((byte)(receivedMsgList.get(i).getFlag() | MessageFlag.IS_HEAD) > 0)
+ if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_HEAD) > 0)
countHead++;
- if((byte)(receivedMsgList.get(i).getFlag() | MessageFlag.IS_OLDHEAD) > 0)
+ if((byte)(receivedMsgList.get(i).getFlag() & MessageFlag.IS_OLDHEAD) > 0)
countOldHead++;
}
if(countHead == 0 && countOldHead == 0)
@@ -100,8 +95,10 @@
return MessageFromHead.BothMsgsFromHead;
else if(countOldHead == 2)
return MessageFromHead.BothMsgsFromOldHead;
- else if(countHead == 1)
- return MessageFromHead.OneMsgFromHead;
+ else if(countHead == 1 && headFlag == 0)
+ return MessageFromHead.OneMsgFromHeadToNonHead;
+ else if(countHead == 1 && headFlag > 0)
+ return MessageFromHead.OneMsgFromHeadToHead;
else if(countOldHead == 1)
return MessageFromHead.OneMsgFromNonHead;
@@ -125,29 +122,36 @@
processMerge(receivedMsgList.get(i));
break;
case MessageFromHead.BothMsgsFromHead:
+ case MessageFromHead.BothMsgsFromOldHead:
for(int i = 0; i < 2; i++)
processMerge(receivedMsgList.get(i));
getVertexValue().setState(MessageFlag.IS_FINAL);
break;
- case MessageFromHead.BothMsgsFromOldHead:
- deleteVertex(getVertexId());
- break;
- case MessageFromHead.OneMsgFromHead:
+ case MessageFromHead.OneMsgFromHeadToNonHead:
for(int i = 0; i < 2; i++)
processMerge(receivedMsgList.get(i));
getVertexValue().setState(MessageFlag.IS_HEAD);
break;
+ case MessageFromHead.OneMsgFromHeadToHead: //stop condition
+ for(int i = 0; i < 2; i++){
+ if(headFlag > 0){
+ processMerge(receivedMsgList.get(i));
+ break;
+ }
+ }
+ getVertexValue().setState(MessageFlag.IS_FINAL);
+ //voteToHalt();
+ break;
case MessageFromHead.OneMsgFromNonHead:
//halt
- voteToHalt();
+ //voteToHalt();
break;
}
- } else
- voteToHalt();
+ }
//send out wantToMerge msg
- headFlag = (byte)(getVertexValue().getState() | MessageFlag.IS_HEAD);
- oldHeadFlag = (byte)(getVertexValue().getState() | MessageFlag.IS_OLDHEAD);
- outFlag = (byte)(headFlag | oldHeadFlag);
+ resetHeadFlag();
+ finalFlag = (byte)(getVertexValue().getState() & MessageFlag.IS_FINAL);
+ outFlag = (byte)(headFlag | finalFlag);
if(outFlag == 0)
sendOutMsg();
}
@@ -156,25 +160,10 @@
* path response message to head
*/
public void responseMsgToHeadVertex(Iterator<MessageWritable> msgIterator) {
- sendMergeMsg();
- }
-
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public boolean mergeChainVertex() {
- //merge chain
- lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- KmerBytesWritable chainVertexId = kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer);
- getVertexValue().setKmer(chainVertexId);
- getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
- if (VertexUtil.isCycle(kmerFactory.getFirstKmerFromChain(kmerSize, getVertexValue().getKmer()),
- chainVertexId, kmerSize)) {
- getVertexValue().setState(State.CYCLE);
- return false;
- }
- return true;
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ sendMergeMsg();
+ }
}
@Override
@@ -186,10 +175,10 @@
initState(msgIterator);
else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- //voteToHalt();
+ if(headFlag == 0)
+ voteToHalt();
} else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- voteToHalt();
} else
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
index e8fb61a..0b636ee 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/NaiveAlgorithmForPathMergeVertex.java
@@ -185,7 +185,7 @@
public void mergeChainVertex() {
//merge chain
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
+ incomingMsg.getKmer()));
getVertexValue().setKmer(kmerFactory.mergeTwoKmer(getVertexValue().getKmer(), lastKmer));
getVertexValue().setOutgoingList(incomingMsg.getNeighberNode());
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
index 3e1b87e..69e1fb6 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P3ForPathMergeVertex.java
@@ -231,7 +231,7 @@
*/
public void mergeChainVertex(){
lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
+ incomingMsg.getKmer()));
getVertexValue().setKmer(
kmerFactory.mergeTwoKmer(getVertexValue().getKmer(),
lastKmer));
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 2457c1c..f250afc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -159,10 +159,10 @@
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
- sendUpMsgFromPredecessor();
+ sendUpMsgToPredecessor(); //TODO up -> update From -> to
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
- sendUpMsgFromSuccessor();
+ sendUpMsgToSuccessor();
}
} else {
// I'm a tail
@@ -170,19 +170,19 @@
if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- sendUpMsgFromPredecessor();
+ sendUpMsgToPredecessor();
}
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
- sendUpMsgFromPredecessor();
+ sendUpMsgToPredecessor();
}
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
- sendUpMsgFromSuccessor();
+ sendUpMsgToSuccessor();
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
index 18e7d23..27eb40a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P5ForPathMergeVertex.java
@@ -420,7 +420,7 @@
getVertexValue().processMerges(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, VertexUtil.getNodeIdFromAdjacencyList(incomingMsg.getNeighberNode()),
- kmerSize, incomingMsg.getChainVertexId());
+ kmerSize, incomingMsg.getKmer());
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
index e7a0ff1..f343c2e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
@@ -2,10 +2,12 @@
public class MessageFromHead {
public static final byte BothMsgsFromHead = 1 << 0;
- public static final byte BothMsgsFromNonHead = 2 << 0;
- public static final byte BothMsgsFromOldHead = 3 << 0;
- public static final byte OneMsgFromHead = 4 << 1;
- public static final byte OneMsgFromNonHead = 5 << 1;
+ public static final byte BothMsgsFromNonHead = 1 << 1;
+ public static final byte BothMsgsFromOldHead = 1 << 2;
+ public static final byte OneMsgFromHead = 1 << 3;
+ public static final byte OneMsgFromNonHead = 1 << 4;
+ public static final byte OneMsgFromHeadToNonHead = 1 << 5;
+ public static final byte OneMsgFromHeadToHead = 1 << 6;
public static final byte NO_INFO = 0 << 0;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 7e56c65..e9197db 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -117,7 +117,9 @@
public static PositionWritable getNodeIdFromAdjacencyList(AdjacencyListWritable adj){
if(adj.getForwardList().getCountOfPosition() > 0)
return adj.getForwardList().getPosition(0);
- else
+ else if(adj.getReverseList().getCountOfPosition() > 0)
return adj.getReverseList().getPosition(0);
+ else
+ return null;
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index f52957e..a63da20 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -183,8 +183,8 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- //genLogAlgorithmForMergeGraph();
- genP3ForMergeGraph();
+ genLogAlgorithmForMergeGraph();
+ //genP3ForMergeGraph();
//genTipAddGraph();
//genTipRemoveGraph();
//genBridgeAddGraph();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index f86ef23..cd81a97 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,7 +45,7 @@
public static final String PreFix = "data/input"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- + "read"};/*, PreFix + File.separator
+ + "pathmerge"};/*, PreFix + File.separator
/*+ "CyclePath"};, PreFix + File.separator
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
index c5aa0eb..355bc7a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/graphbuilding/JobRunStepByStepTest.java
@@ -46,11 +46,11 @@
@SuppressWarnings("deprecation")
public class JobRunStepByStepTest {
private static final int KmerSize = 3;
- private static final int ReadLength = 8;
+ private static final int ReadLength = 7;
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_INPUT_PATH = "data/graphbuild.test/tworeads.txt";
+ private static final String DATA_INPUT_PATH = "data/graphbuild.test/read.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -71,8 +71,8 @@
@Test
public void TestAll() throws Exception {
- TestEndToEnd();
- //TestUnMergedNode();
+ //TestEndToEnd();
+ TestUnMergedNode();
}
public void TestEndToEnd() throws Exception {
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
deleted file mode 100644
index 6cfeda1..0000000
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P3ForMergeGraph.xml
+++ /dev/null
@@ -1,144 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
-<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
-<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
-<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
-<property><name>P3ForPathMergeVertex.kmerSize</name><value>3</value></property>
-<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
-<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
-<property><name>mapred.submit.replication</name><value>10</value></property>
-<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
-<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
-<property><name>P3ForPathMergeVertex.pseudoRate</name><value>0.3</value></property>
-<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
-<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
-<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
-<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
-<property><name>keep.failed.task.files</name><value>false</value></property>
-<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
-<property><name>mapred.output.value.class</name><value>edu.uci.ics.genomix.pregelix.io.ValueStateWritable</value></property>
-<property><name>io.bytes.per.checksum</name><value>512</value></property>
-<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
-<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
-<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
-<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
-<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
-<property><name>fs.checkpoint.period</name><value>3600</value></property>
-<property><name>mapred.child.tmp</name><value>./tmp</value></property>
-<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
-<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
-<property><name>hadoop.logfile.count</name><value>10</value></property>
-<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
-<property><name>io.map.index.skip</name><value>0</value></property>
-<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
-<property><name>mapred.output.compress</name><value>false</value></property>
-<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
-<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
-<property><name>fs.checkpoint.size</name><value>67108864</value></property>
-<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
-<property><name>mapred.job.name</name><value>P3ForMergeGraph</value></property>
-<property><name>local.cache.size</name><value>10737418240</value></property>
-<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
-<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
-<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
-<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
-<property><name>mapred.task.timeout</name><value>600000</value></property>
-<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
-<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
-<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
-<property><name>ipc.client.kill.max</name><value>10</value></property>
-<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
-<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
-<property><name>io.sort.record.percent</name><value>0.05</value></property>
-<property><name>hadoop.security.authorization</name><value>false</value></property>
-<property><name>mapred.max.tracker.failures</name><value>4</value></property>
-<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
-<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
-<property><name>mapred.map.tasks</name><value>2</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
-<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
-<property><name>fs.default.name</name><value>file:///</value></property>
-<property><name>mapred.output.key.class</name><value>edu.uci.ics.genomix.type.PositionWritable</value></property>
-<property><name>P3ForPathMergeVertex.maxRound</name><value>2</value></property>
-<property><name>tasktracker.http.threads</name><value>40</value></property>
-<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
-<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
-<property><name>mapred.reduce.tasks</name><value>1</value></property>
-<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
-<property><name>pregelix.vertexClass</name><value>edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex</value></property>
-<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
-<property><name>io.file.buffer.size</name><value>4096</value></property>
-<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
-<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
-<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
-<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
-<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
-<property><name>hadoop.native.lib</name><value>true</value></property>
-<property><name>fs.s3.block.size</name><value>67108864</value></property>
-<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
-<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
-<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
-<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
-<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
-<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
-<property><name>mapred.queue.names</name><value>default</value></property>
-<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
-<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
-<property><name>mapred.job.tracker</name><value>local</value></property>
-<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
-<property><name>fs.s3.maxRetries</name><value>4</value></property>
-<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
-<property><name>fs.trash.interval</name><value>0</value></property>
-<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
-<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
-<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
-<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
-<property><name>io.sort.mb</name><value>100</value></property>
-<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
-<property><name>io.sort.factor</name><value>10</value></property>
-<property><name>mapred.task.profile</name><value>false</value></property>
-<property><name>job.end.retry.interval</name><value>30000</value></property>
-<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
-<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
-<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
-<property><name>webinterface.private.actions</name><value>false</value></property>
-<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
-<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
-<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
-<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
-<property><name>mapred.compress.map.output</name><value>false</value></property>
-<property><name>io.sort.spill.percent</name><value>0.80</value></property>
-<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
-<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
-<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
-<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
-<property><name>job.end.retry.attempts</name><value>0</value></property>
-<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
-<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.DataCleanOutputFormat</value></property>
-<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
-<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
-<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
-<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat</value></property>
-<property><name>mapred.job.queue.name</name><value>default</value></property>
-<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
-<property><name>pregelix.incStateLength</name><value>true</value></property>
-<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
-<property><name>topology.script.number.args</name><value>100</value></property>
-<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
-<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
-<property><name>mapred.task.cache.levels</name><value>2</value></property>
-<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
-<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
-<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
-<property><name>mapred.min.split.size</name><value>0</value></property>
-<property><name>mapred.map.max.attempts</name><value>4</value></property>
-<property><name>jobclient.output.filter</name><value>FAILED</value></property>
-<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
-<property><name>mapred.acls.enabled</name><value>false</value></property>
-</configuration>
\ No newline at end of file