Add additional merge info to pathmerge
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 762712e..68743cc 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -92,7 +92,11 @@
this();
setAsReference(data, offset);
}
-
+
+ public NodeWritable getNode(){
+ return this;
+ }
+
public void setAsCopy(NodeWritable node) {
setAsCopy(node.edges, node.startReads, node.endReads, node.internalKmer, node.averageCoverage);
}
@@ -470,5 +474,5 @@
public boolean isStartReadOrEndRead(){
return startReads.getCountOfPosition() > 0 || endReads.getCountOfPosition() > 0;
}
-
+
}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index c3aefe6..d9929cd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -5,6 +5,7 @@
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.type.EdgeListWritable;
import edu.uci.ics.genomix.type.EdgeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerListWritable;
@@ -14,7 +15,12 @@
private static final long serialVersionUID = 1L;
- public static class State extends VertexStateFlag{
+ public static class State extends VertexStateFlag{
+ public static final byte HEAD_SHOULD_MERGEWITHPREV = 0b0 << 2; //use for initiating head
+ public static final byte HEAD_SHOULD_MERGEWITHNEXT = 0b1 << 2;
+ public static final byte HEAD_SHOULD_MERGE_MASK = 0b1 << 2;
+ public static final byte HEAD_SHOULD_MERGE_CLEAR = (byte) 11001011;
+
public static final byte NO_MERGE = 0b00 << 3;
public static final byte SHOULD_MERGEWITHNEXT = 0b01 << 3;
public static final byte SHOULD_MERGEWITHPREV = 0b10 << 3;
@@ -60,12 +66,12 @@
traverseMap = new HashMapWritable<VKmerBytesWritable, VKmerListWritable>();
}
- public NodeWritable getNode(){
- NodeWritable node = new NodeWritable();
- node.setAsCopy(this.getEdges(), this.getStartReads(), this.getEndReads(),
- this.getInternalKmer(), this.getAverageCoverage());
- return node;
- }
+// public NodeWritable getNode(){
+// NodeWritable node = new NodeWritable();
+// node.setAsCopy(this.getEdges(), this.getStartReads(), this.getEndReads(),
+// this.getInternalKmer(), this.getAverageCoverage());
+// return super();
+// }
public void setNode(NodeWritable node){
super.setAsCopy(node.getEdges(), node.getStartReads(), node.getEndReads(),
@@ -216,15 +222,17 @@
*/
public void processMerges(byte neighborToDeleteDir, VKmerBytesWritable nodeToDelete,
byte neighborToMergeDir, EdgeWritable nodeToAdd,
- int kmerSize, VKmerBytesWritable kmer){
+ int kmerSize, NodeWritable node){
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
byte deleteDir = (byte)(neighborToDeleteDir & MessageFlag.DIR_MASK);
- this.getEdgeList(deleteDir).remove(nodeToDelete);
- this.getInternalKmer().mergeWithKmerInDir(deleteDir, kmerSize, kmer);
-
- if(nodeToAdd != null){
- byte mergeDir = (byte)(neighborToMergeDir & MessageFlag.DIR_MASK);
- this.getEdgeList(mergeDir).add(nodeToAdd);
- }
+// this.getEdgeList(deleteDir).remove(nodeToDelete);
+ super.getNode().mergeWithNode(deleteDir, node);
+// this.getInternalKmer().mergeWithKmerInDir(deleteDir, kmerSize, kmer);
+
+// if(nodeToAdd != null){
+// byte mergeDir = (byte)(neighborToMergeDir & MessageFlag.DIR_MASK);
+// this.getEdgeList(mergeDir).add(nodeToAdd);
+// }
}
public boolean hasPathTo(VKmerBytesWritable nodeToSeek){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index cf8fac7..065e181 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -3,7 +3,6 @@
import java.util.logging.*;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
import edu.uci.ics.genomix.type.KmerBytesWritable;
public class LogAlgorithmLogFormatter extends Formatter {
@@ -13,7 +12,6 @@
private long step;
private KmerBytesWritable sourceVertexId = new KmerBytesWritable();
private KmerBytesWritable destVertexId = new KmerBytesWritable();
- private MessageWritable msg = new MessageWritable();
private KmerBytesWritable mergeChain = new KmerBytesWritable();
//private boolean testDelete = false;
/**
@@ -32,7 +30,6 @@
this.step = step;
this.sourceVertexId.setAsCopy(sourceVertexId);
this.destVertexId.setAsCopy(destVertexId);
- this.msg = msg;
this.operation = 0;
}
@@ -54,7 +51,6 @@
public void reset() {
this.sourceVertexId = new KmerBytesWritable();
this.destVertexId = new KmerBytesWritable();
- this.msg = new MessageWritable();
this.mergeChain = new KmerBytesWritable();
}
@@ -71,7 +67,6 @@
builder.append("Send message to " + "\r\n");
builder.append("Destination Code: " + dest + "\r\n");
}
- builder.append("Message is: " + Message.MESSAGE_CONTENT.getContentFromCode(msg.getFlag()) + "\r\n");
}
if (operation == 2) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 9dbe50f..ff45b45 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -32,6 +32,7 @@
protected byte outFlag;
protected byte inFlag;
protected byte selfFlag;
+ protected byte headMergeDir;
public static boolean fakeVertexExist = false;
protected static VKmerBytesWritable fakeVertex = null;
@@ -67,6 +68,10 @@
return (byte)(incomingMsg.getFlag() & MessageFlag.VERTEX_MASK);
}
+ public byte getHeadMergeDir(){
+ return (byte) (getVertexValue().getState() & State.HEAD_SHOULD_MERGE_MASK);
+ }
+
/**
* set head state
*/
@@ -313,23 +318,35 @@
* start sending message
*/
public void startSendMsg() {
- if (VertexUtil.isHeadWithoutIndegree(getVertexValue())) {
- outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
+ if (VertexUtil.isHeadVertexWithManyOutgoing(getVertexValue())) {
+ outFlag = 0;
+ outFlag |= MessageFlag.IS_HEAD;
+ outFlag |= MessageFlag.HEAD_SHOULD_MERGEWITHNEXT;
+ outgoingMsg.setFlag(outFlag);
sendMsgToAllNextNodes();
voteToHalt();
}
- if (VertexUtil.isRearVertexWithOutdegree(getVertexValue())) {
- outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
+ if (VertexUtil.isRearVertexWithManyIncoming(getVertexValue())) {
+ outFlag = 0;
+ outFlag |= MessageFlag.IS_HEAD;
+ outFlag |= MessageFlag.HEAD_SHOULD_MERGEWITHPREV;
+ outgoingMsg.setFlag(outFlag);
sendMsgToAllPreviousNodes();
voteToHalt();
}
- if (VertexUtil.isHeadVertexWithIndegree(getVertexValue())){
- outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
+ if (VertexUtil.isHeadVertexWithOnlyOneOutgoing(getVertexValue())){
+ outFlag = 0;
+ outFlag |= MessageFlag.IS_HEAD;
+ outFlag |= MessageFlag.HEAD_SHOULD_MERGEWITHNEXT;
+ outgoingMsg.setFlag(outFlag);
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
- if (VertexUtil.isRearWithoutOutdegree(getVertexValue())){
- outgoingMsg.setFlag((byte)(MessageFlag.IS_HEAD));
+ if (VertexUtil.isRearVertexWithOnlyOneIncoming(getVertexValue())){
+ outFlag = 0;
+ outFlag |= MessageFlag.IS_HEAD;
+ outFlag |= MessageFlag.HEAD_SHOULD_MERGEWITHPREV;
+ outgoingMsg.setFlag(outFlag);
sendMsg(getVertexId(), outgoingMsg); //send to itself
voteToHalt();
}
@@ -340,18 +357,48 @@
*/
public void initState(Iterator<M> msgIterator) {
while (msgIterator.hasNext()) {
- if (!VertexUtil.isPathVertex(getVertexValue())
- && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
- && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
- msgIterator.next();
- voteToHalt();
- } else {
- incomingMsg = msgIterator.next();
- if(getHeadFlag() > 0)
- voteToHalt();
- else
- getVertexValue().setState(incomingMsg.getFlag());
+ incomingMsg = msgIterator.next();
+ byte headMergeDir = (byte) (incomingMsg.getFlag() & MessageFlag.HEAD_SHOULD_MERGE_MASK);
+ switch(headMergeDir){
+ case MessageFlag.HEAD_SHOULD_MERGEWITHPREV:
+ if(VertexUtil.isValidRear(getVertexValue())){
+ /** not set up yet **/
+ if(getHeadFlag() != MessageFlag.IS_HEAD){
+ getVertexValue().setState(incomingMsg.getFlag());
+ } else{ /** already set up **/
+ /** if headMergeDir are not the same **/
+ if(getHeadMergeDir() != headMergeDir){
+ getVertexValue().setState(MessageFlag.IS_NON);
+ }
+ }
+ }
+ break;
+ case MessageFlag.HEAD_SHOULD_MERGEWITHNEXT:
+ if(VertexUtil.isValidHead(getVertexValue())){
+ /** not set up yet **/
+ if(getHeadFlag() != MessageFlag.IS_HEAD){
+ getVertexValue().setState(incomingMsg.getFlag());
+ } else{ /** already set up **/
+ /** if headMergeDir are not the same **/
+ if(getHeadMergeDir() != headMergeDir){
+ getVertexValue().setState(MessageFlag.IS_NON);
+ }
+ }
+ }
+ break;
}
+// if (!VertexUtil.isPathVertex(getVertexValue())
+// && !VertexUtil.isHeadWithoutIndegree(getVertexValue())
+// && !VertexUtil.isRearWithoutOutdegree(getVertexValue())) {
+// msgIterator.next();
+// voteToHalt();
+// } else {
+// incomingMsg = msgIterator.next();
+// if(getHeadFlag() > 0)
+// voteToHalt();
+// else
+// getVertexValue().setState(incomingMsg.getFlag());
+// }
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index d9391b8..32ce597 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -49,6 +49,24 @@
processMerge(incomingMsg);
}
+ public byte flipHeadMergeDir(byte d, boolean isFlip){
+ if(isFlip){
+ switch(d){
+ case State.HEAD_SHOULD_MERGEWITHPREV:
+ return State.HEAD_SHOULD_MERGEWITHNEXT;
+ case State.HEAD_SHOULD_MERGEWITHNEXT:
+ return State.HEAD_SHOULD_MERGEWITHPREV;
+ default:
+ return 0;
+ }
+ } else
+ return d;
+ }
+
+ public boolean isDifferentDirWithMergeKmer(byte neighborToMeDir){
+ return neighborToMeDir == MessageFlag.DIR_FR || neighborToMeDir == MessageFlag.DIR_RF;
+ }
+
/**
* merge and updateAdjList having parameter
*/
@@ -59,7 +77,10 @@
if((inFlag & MessageFlag.IS_HEAD) > 0){
byte state = getVertexValue().getState();
+ state &= State.HEAD_SHOULD_MERGE_CLEAR;
state |= State.IS_HEAD;
+ byte headMergeDir = flipHeadMergeDir((byte)(inFlag & MessageFlag.HEAD_SHOULD_MERGE_MASK), isDifferentDirWithMergeKmer(neighborToMeDir));
+ state |= headMergeDir;
getVertexValue().setState(state);
}
@@ -67,7 +88,7 @@
getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
neighborToMergeDir, msg.getNeighborEdge(),
- kmerSize, msg.getInternalKmer());
+ kmerSize, msg.getNode());
}
/**
@@ -117,9 +138,10 @@
break;
}
- getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
- neighborToMergeDir, msg.getNeighborEdge(),
- kmerSize, tmpKmer);
+ //TODO: fix mergeWithNode
+// getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
+// neighborToMergeDir, msg.getNeighborEdge(),
+// kmerSize, tmpKmer);
}
/**
@@ -353,9 +375,9 @@
outgoingMsg.setFlip(ifFilpWithSuccessor());
for(byte d: OutgoingListFlag.values)
outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
- outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
+ outgoingMsg.setNode(getVertexValue().getNode());
+// outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
sendMsg(getPrevDestVertexId(), outgoingMsg);
-// deleteVertex(getVertexId());
}
public void configureMergeMsgForSuccessor(){
@@ -365,9 +387,9 @@
outgoingMsg.setFlip(ifFlipWithPredecessor());
for(byte d: IncomingListFlag.values)
outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
- outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
+ outgoingMsg.setNode(getVertexValue().getNode());
+// outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
sendMsg(getNextDestVertexId(), outgoingMsg);
-// deleteVertex(getVertexId());
}
/**
@@ -377,13 +399,11 @@
setPredecessorAdjMsg();
outgoingMsg.setFlag(outFlag);
outgoingMsg.setFlip(ifFilpWithSuccessor());
-// outFlag &= MessageFlag.DIR_CLEAR;
-// outFlag |= meToNeighborDir;
-// outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
for(byte d: OutgoingListFlag.values)
outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
- outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
+ outgoingMsg.setNode(getVertexValue().getNode());
+// outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
}
@@ -391,21 +411,21 @@
setSuccessorAdjMsg();
outgoingMsg.setFlag(outFlag);
outgoingMsg.setFlip(ifFlipWithPredecessor());
-// outFlag &= MessageFlag.DIR_CLEAR;
-// outFlag |= meToNeighborDir;
-// outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
for(byte d: IncomingListFlag.values)
outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
- outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
+ outgoingMsg.setNode(getVertexValue().getNode());
+// outgoingMsg.setInternalKmer(getVertexValue().getInternalKmer());
sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
}
/**
* send merge message to neighber for P4
*/
public void broadcastMergeMsg(){
- if(headFlag > 0)
+ if(headFlag > 0){
outFlag |= MessageFlag.IS_HEAD;
+ outFlag |= headMergeDir;
+ }
switch(getVertexValue().getState() & State.SHOULD_MERGE_MASK) {
case State.SHOULD_MERGEWITHNEXT:
/** configure merge msg for successor **/
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
index bb2d162..489a059 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P2ForPathMergeVertex.java
@@ -11,7 +11,7 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
-import edu.uci.ics.genomix.pregelix.type.MessageFromHead;
+import edu.uci.ics.genomix.pregelix.type.MessageTypeFromHead;
import edu.uci.ics.genomix.type.VKmerListWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
/*
@@ -124,15 +124,15 @@
}
}
if(countHead == 2)
- return MessageFromHead.BothMsgsFromHead;
+ return MessageTypeFromHead.BothMsgsFromHead;
else if(countHead == 1 && countOldHead == 1)
- return MessageFromHead.OneMsgFromOldHeadAndOneFromHead;
+ return MessageTypeFromHead.OneMsgFromOldHeadAndOneFromHead;
else if(countHead == 1 && countOldHead == 0)
- return MessageFromHead.OneMsgFromHeadAndOneFromNonHead;
+ return MessageTypeFromHead.OneMsgFromHeadAndOneFromNonHead;
else if(countHead == 0 && countOldHead == 0)
- return MessageFromHead.BothMsgsFromNonHead;
+ return MessageTypeFromHead.BothMsgsFromNonHead;
else
- return MessageFromHead.NO_MSG;
+ return MessageTypeFromHead.NO_MSG;
}
/**
@@ -151,8 +151,8 @@
/** process merge when receiving msg **/
byte numOfMsgsFromHead = checkNumOfMsgsFromHead();
switch(numOfMsgsFromHead){
- case MessageFromHead.BothMsgsFromHead:
- case MessageFromHead.OneMsgFromOldHeadAndOneFromHead:
+ case MessageTypeFromHead.BothMsgsFromHead:
+ case MessageTypeFromHead.OneMsgFromOldHeadAndOneFromHead:
for(int i = 0; i < 2; i++)
processFinalMerge(receivedMsgList.get(i)); //processMerge()
getVertexValue().setState(State.IS_FINAL);
@@ -160,17 +160,17 @@
sendMsgToFakeVertex();
voteToHalt();
break;
- case MessageFromHead.OneMsgFromHeadAndOneFromNonHead:
+ case MessageTypeFromHead.OneMsgFromHeadAndOneFromNonHead:
for(int i = 0; i < 2; i++)
processFinalMerge(receivedMsgList.get(i));
setHeadState();
this.activate();
break;
- case MessageFromHead.BothMsgsFromNonHead:
+ case MessageTypeFromHead.BothMsgsFromNonHead:
for(int i = 0; i < 2; i++)
processFinalMerge(receivedMsgList.get(i));
break;
- case MessageFromHead.NO_MSG:
+ case MessageTypeFromHead.NO_MSG:
//halt
voteToHalt(); //deleteVertex(getVertexId());
break;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 1716ca0..f949092 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -63,7 +63,6 @@
private boolean curHead;
private boolean nextHead;
private boolean prevHead;
- private byte selfFlag;
/**
* initiate kmerSize, maxIteration
@@ -93,13 +92,12 @@
outFlag = (byte)0;
inFlag = (byte)0;
// Node may be marked as head b/c it's a real head or a real tail
- headFlag = (byte) (State.IS_HEAD & getVertexValue().getState());
+ headFlag = getHeadFlag();
+ headMergeDir = getHeadMergeDir();
}
protected boolean isNodeRandomHead(VKmerBytesWritable nodeKmer) {
// "deterministically random", based on node id
- //randGenerator.setSeed(randSeed);
- //randSeed = randGenerator.nextInt();
randGenerator.setSeed((randSeed ^ nodeKmer.hashCode()) * 100000 * getSuperstep());//randSeed + nodeID.hashCode()
for(int i = 0; i < 500; i++)
randGenerator.nextFloat();
@@ -148,8 +146,6 @@
else if (getSuperstep() == 2)
initState(msgIterator);
else if (getSuperstep() % 4 == 3){
- //tailFlag = (byte) (MessageFlag.IS_TAIL & getVertexValue().getState());
- //outFlag = (byte) (headFlag | tailFlag);
outFlag |= headFlag;
outFlag |= MessageFlag.NO_MERGE;
@@ -162,8 +158,8 @@
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue());//&& headFlag == 0;
- hasPrev = setPrevInfo(getVertexValue());//&& headFlag == 0;
+ hasNext = setNextInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHNEXT));
+ hasPrev = setPrevInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHPREV));
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
deleted file mode 100644
index ca8d795..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/AdjMessage.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package edu.uci.ics.genomix.pregelix.type;
-
-public class AdjMessage {
- public static final byte FROMFF = 0;
- public static final byte FROMFR = 1;
- public static final byte FROMRF = 2;
- public static final byte FROMRR = 3;
- public static final byte NON = 4;
- public static final byte UNCHANGE = 5;
- public static final byte MERGE = 6;
- public static final byte KILL = 7;
-
- public final static class ADJMESSAGE_CONTENT {
- public static String getContentFromCode(byte code) {
- String r = "";
- switch (code) {
- case FROMFF:
- r = "FROMFF";
- break;
- case FROMFR:
- r = "FROMFR";
- break;
- case FROMRF:
- r = "FROMRF";
- break;
- case FROMRR:
- r = "FROMRR";
- break;
- case NON:
- r = "NON";
- break;
- case UNCHANGE:
- r = "UNCHANGE";
- break;
- case MERGE:
- r = "MERGE";
- break;
- case KILL:
- r = "KILL";
- break;
- }
- return r;
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
deleted file mode 100644
index a845f8a..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package edu.uci.ics.genomix.pregelix.type;
-
-public class CheckMessage {
-
- public static final byte SOURCE = 1 << 0;
- public static final byte INTERNALKMER = 1 << 1;
- public static final byte NEIGHBER = 1 << 2;
- public static final byte PATHLIST = 1 << 3;
- public static final byte NODEIDLIST = 1 << 4;
- public static final byte ADJMSG = 1 << 5;
- public static final byte START = 1 << 6;
-
- public final static class CheckMessage_CONTENT {
-
- public static String getContentFromCode(byte code) {
- String r = "";
- switch (code) {
- case SOURCE:
- r = "SOURCE";
- break;
- case INTERNALKMER:
- r = "INTERNALKMER";
- break;
- case NEIGHBER:
- r = "NEIGHBER";
- break;
- case PATHLIST:
- r = "PATHLIST";
- break;
- case NODEIDLIST:
- r = "READID";
- break;
- case ADJMSG:
- r = "ADJMSG";
- break;
- case START:
- r = "START";
- break;
- }
- return r;
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
deleted file mode 100644
index 4644383..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package edu.uci.ics.genomix.pregelix.type;
-
-public class Message {
-
- public static final byte NON = 0;
- public static final byte START = 1;
- public static final byte END = 2;
- public static final byte STOP = 3;
- public static final byte FROMPSEUDOHEAD = 4;
- public static final byte FROMPSEUDOREAR = 5;
- public static final byte IN = 6;
- public static final byte OUT = 7;
-
- public final static class MESSAGE_CONTENT {
-
- public static String getContentFromCode(byte code) {
- String r = "";
- switch (code) {
- case NON:
- r = "NON";
- break;
- case START:
- r = "START";
- break;
- case END:
- r = "END";
- break;
- case STOP:
- r = "STOP";
- break;
- case FROMPSEUDOHEAD:
- r = "FROMPSEUDOHEAD";
- break;
- case FROMPSEUDOREAR:
- r = "FROMPSEUDOREAR";
- break;
- case IN:
- r = "IN";
- break;
- case OUT:
- r = "OUT";
- break;
- }
- return r;
- }
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageTypeFromHead.java
similarity index 95%
rename from genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
rename to genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageTypeFromHead.java
index 05a2f95..5a49ee7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageFromHead.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/MessageTypeFromHead.java
@@ -1,6 +1,6 @@
package edu.uci.ics.genomix.pregelix.type;
-public class MessageFromHead {
+public class MessageTypeFromHead {
public static final byte BothMsgsFromHead = 0b0000 << 1;
public static final byte BothMsgsFromNonHead = 0b0001 << 1;
public static final byte BothMsgsFromOldHead = 0b0010 << 1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
index 5ba39fa..b6ee29d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/util/VertexUtil.java
@@ -136,4 +136,20 @@
else
return null;
}
+
+ /**
+ * check if vertex is a valid head
+ * valid head = 1. path node || 2. only one outgoing + no path node
+ */
+ public static boolean isValidHead(VertexValueWritable value){
+ return isPathVertex(value) || (value.outDegree() == 1 && !isPathVertex(value));
+ }
+
+ /**
+ * check if vertex is a valid Rear
+ * valid head = 1. path node || 2. only one incoming + no path node
+ */
+ public static boolean isValidRear(VertexValueWritable value){
+ return isPathVertex(value) || (value.inDegree() == 1 && !isPathVertex(value));
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 2939dd8..e1ae674 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -118,7 +118,7 @@
private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(P4ForPathMergeVertex.class);
- job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(VKmerBytesWritable.class);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
index ee18c75..4fd8656 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
@@ -6,15 +6,15 @@
public static Test suite() throws Exception {
String pattern ="PathMerge";
- String testSet[] = //{"2", "3", "4", "5", "6", "7", "8", "9"};
- {
+ String testSet[] = {"2", "3", "4", "5", "6", "7", "8", "9"};
+// {
// "SimplePath",
// "ThreeDuplicate",
- "head_6"
+// "head_6"
// "CyclePath",
// "SelfPath"
// "TreePath"
- };
+// };
init(pattern, testSet);
BasicGraphCleanTestSuite testSuite = new BasicGraphCleanTestSuite();
return makeTestSuite(testSuite);
diff --git a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
index b0f07b3..16e0c69 100644
--- a/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
+++ b/genomix/genomix-pregelix/src/test/resources/jobs/P4ForMergeGraph.xml
@@ -122,7 +122,7 @@
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
-<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>pregelix.incStateLength</name><value>true</value></property>