Code review on P4 -- 2013.9.16
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index 13ce8ec..dd2f08c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -254,6 +254,7 @@
* Process any changes to value. This is for edge updates. nodeToAdd should be only edge
*/
public void processUpdates(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
+ // TODO remove this function (use updateEdges)
byte replaceDir = mirrorDirection(deleteDir);
this.getNode().updateEdges(deleteDir, toDelete, updateDir, replaceDir, other, true);
}
@@ -267,8 +268,8 @@
* Process any changes to value. This is for merging. nodeToAdd should be only edge
*/
public void processMerges(byte mergeDir, NodeWritable node, int kmerSize){
- KmerBytesWritable.setGlobalKmerLength(kmerSize);
- mergeDir = (byte)(mergeDir & MessageFlag.DIR_MASK);
+ KmerBytesWritable.setGlobalKmerLength(kmerSize); // TODO Do this once at the init of your function, then you don't need it as a parameter here
+ mergeDir = (byte)(mergeDir & MessageFlag.DIR_MASK); // TODO move this dir outside and remove this function
super.getNode().mergeWithNode(mergeDir, node);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index e29afa4..0d5448a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -62,7 +62,7 @@
protected short inFlag;
protected short selfFlag;
- protected EdgeListWritable incomingEdgeList = null; //SplitRepeat
+ protected EdgeListWritable incomingEdgeList = null; //SplitRepeat // TODO Push as much data to subclasses as makes sense to
protected EdgeListWritable outgoingEdgeList = null; //SplitRepeat
protected byte incomingEdgeDir = 0; //SplitRepeat
protected byte outgoingEdgeDir = 0; //SplitRepeat
@@ -104,7 +104,7 @@
return (byte)(incomingMsg.getFlag() & MessageFlag.VERTEX_MASK);
}
- public byte getHeadMergeDir(){
+ public byte getHeadMergeDir(){ // TODO push it to derived class
return (byte) (getVertexValue().getState() & State.HEAD_CAN_MERGE_MASK);
}
@@ -203,6 +203,7 @@
* get destination vertex
*/
public VKmerBytesWritable getPrevDestVertexId() {
+ // TODO check length of RF and RR == 1; throw exception otherwise
if (!getVertexValue().getRFList().isEmpty()){ //#RFList() > 0
kmerIterator = getVertexValue().getRFList().getKeys();
return kmerIterator.next();
@@ -473,11 +474,11 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
switch(getHeadMergeDir()){
- case State.NON_HEAD:
+ case State.NON_HEAD: // TODO Change name to Path
setHeadMergeDir();
activate();
break;
- case State.HEAD_CAN_MERGEWITHPREV:
+ case State.HEAD_CAN_MERGEWITHPREV: // TODO aggregate all the incomingMsgs first, then make a decision about halting
case State.HEAD_CAN_MERGEWITHNEXT:
if (getHeadFlagAndMergeDir() != getMsgFlagAndMergeDir()){
getVertexValue().setState(State.HEAD_CANNOT_MERGE);
@@ -555,7 +556,7 @@
}
public void setPredecessorToMeDir(VKmerBytesWritable toFind){
- outFlag &= MessageFlag.DIR_CLEAR;
+ outFlag &= MessageFlag.DIR_CLEAR; // TODO WHAT HAPPENS IF THE NODE IS IN YOUR R AND YOUR RF?
if(getVertexValue().getRFList().contains(toFind))
outFlag |= MessageFlag.DIR_RF;
else if(getVertexValue().getRRList().contains(toFind))
@@ -567,13 +568,14 @@
*/
public void setSuccessorToMeDir(){
outFlag &= MessageFlag.DIR_CLEAR;
- if(!getVertexValue().getFFList().isEmpty())
+ if(!getVertexValue().getFFList().isEmpty()) // TODO == 1 rather than isEmpty
outFlag |= MessageFlag.DIR_FF;
else if(!getVertexValue().getFRList().isEmpty())
outFlag |= MessageFlag.DIR_FR;
+ // TODO else exception
}
- public void setSuccessorToMeDir(VKmerBytesWritable toFind){
+ public void setSuccessorToMeDir(VKmerBytesWritable toFind){ // TODO you should never have to FIND...
outFlag &= MessageFlag.DIR_CLEAR;
if(getVertexValue().getFFList().contains(toFind))
outFlag |= MessageFlag.DIR_FF;
@@ -613,7 +615,7 @@
/**
* check if need filp
*/
- public byte flipDirection(byte neighborDir, boolean flip){
+ public byte flipDirection(byte neighborDir, boolean flip){ // TODO use NodeWritable
if(flip){
switch (neighborDir) {
case MessageFlag.DIR_FF:
@@ -840,7 +842,7 @@
* non-head && non-path
*/
public boolean isInactiveNode(){
- return VertexUtil.isUnMergeVertex(getVertexValue()) || isTandemRepeat(getVertexValue());
+ return !VertexUtil.isCanMergeVertex(getVertexValue()) || isTandemRepeat(getVertexValue());
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 57a6ea5..6f6435c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -35,11 +35,14 @@
* updateAdjList
*/
public void processUpdate(){
+ // A -> B -> C with B merging with C
inFlag = incomingMsg.getFlag();
- byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
- byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK); // A -> B dir
+ byte neighborToMeDir = mirrorDirection(meToNeighborDir); // B -> A dir
- byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
+ // TODO if you want, this logic could be figured out when sending the update from B
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip()); // A -> C after the merge
+ // TODO add C -> A dir and call node.updateEdges directly
getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
neighborToMergeDir, incomingMsg.getNode());
}
@@ -73,7 +76,7 @@
/**
* merge and updateAdjList merge with one neighbor
*/
- public void processMerge(){
+ public void processMerge(){ // TODO remove me
processMerge(incomingMsg);
}
@@ -120,7 +123,7 @@
byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
- if(isNonHeadReceivedFromHead()){
+ if(isNonHeadReceivedFromHead()){ // TODO? why sepcial-case the path vs heads? just aggregate your state flags
short state = getVertexValue().getState();
state &= State.HEAD_CAN_MERGE_CLEAR;
byte headMergeDir = flipHeadMergeDir((byte)(inFlag & MessageFlag.HEAD_CAN_MERGE_MASK), isDifferentDirWithMergeKmer(neighborToMeDir));
@@ -134,12 +137,14 @@
/**
* configure UPDATE msg boolean: true == P4, false == P2
*/
- public void configureUpdateMsgForPredecessor(boolean flag){
+ public void configureUpdateMsgForPredecessor(boolean isP4){
outgoingMsg.setSourceVertexId(getVertexId());
+ // TODO pass in isForward
for(byte d: OutgoingListFlag.values)
- outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
+ outgoingMsg.getNode().setEdgeList(d, getVertexValue().getEdgeList(d)); // TODO check
- if(flag)
+ // TODO pass in the vertexId rather than isP4 (removes this blockļ¼
+ if(isP4)
outgoingMsg.setFlip(ifFilpWithSuccessor());
else
outgoingMsg.setFlip(ifFilpWithSuccessor(incomingMsg.getSourceVertexId()));
@@ -147,7 +152,7 @@
kmerIterator = getVertexValue().getRFList().getKeys();
while(kmerIterator.hasNext()){
destVertexId.setAsCopy(kmerIterator.next());
- setPredecessorToMeDir(destVertexId);
+ setPredecessorToMeDir(destVertexId); // TODO DON'T NEED TO SEARCH for this
outgoingMsg.setFlag(outFlag);
sendMsg(destVertexId, outgoingMsg);
}
@@ -209,7 +214,7 @@
* This vertex tries to merge with next vertex and send update msg to predecesspr
*/
public void sendUpdateMsgToPredecessor(boolean flag){
- if(getVertexValue().hasNextDest())
+ if(getVertexValue().hasNextDest()) //TODO delete
broadcastUpdateMsg(flag);
}
@@ -328,7 +333,7 @@
setSuccessorToMeDir();
outgoingMsg.setFlag(outFlag);
outgoingMsg.setSourceVertexId(getVertexId());
- outgoingMsg.setFlip(ifFlipWithPredecessor());
+ outgoingMsg.setFlip(ifFlipWithPredecessor()); // TODO seems incorrect for outgoing... why predecessor? //TODO REMOVE this flip boolean completely
// for(byte d: IncomingListFlag.values)
// outgoingMsg.setEdgeList(d, getVertexValue().getEdgeList(d));
outgoingMsg.setNode(getVertexValue().getNode());
@@ -369,7 +374,7 @@
switch(getVertexValue().getState() & State.CAN_MERGE_MASK) {
case State.CAN_MERGEWITHNEXT:
// configure merge msg for successor
- configureMergeMsgForSuccessor(getNextDestVertexId());
+ configureMergeMsgForSuccessor(getNextDestVertexId()); // TODO getDestVertexId(DIRECTION), then remove the switch statement, sendMergeMsg(DIRECTION)
if(deleteSelf)
deleteVertex(getVertexId());
else{
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 665db88..c179a4d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -127,7 +127,8 @@
startSendMsg();
else if (getSuperstep() == 2)
initState(msgIterator);
- else if (getSuperstep() % 4 == 3){
+ else if (getSuperstep() % 4 == 3){
+ // TODO separate function for this block
//initiate merge_dir
setStateAsNoMerge();
@@ -137,14 +138,15 @@
// the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
// We prevent merging towards non-path nodes
- hasNext = setNextInfo(getVertexValue());
+ hasNext = setNextInfo(getVertexValue()); // TODO make this false if the node is restricted by its neighbors or by structure
hasPrev = setPrevInfo(getVertexValue());
if (hasNext || hasPrev) {
if (curHead) {
if (hasNext && !nextHead) {
// compress this head to the forward tail
setStateAsMergeWithNext();
- sendUpdateMsgToPredecessor(true);
+// configureUpdateMsg(isNextOrPrevious?, getVertexValue()); // TODO change to sendmsg or sendUpdateFrom...
+ sendUpdateMsgToPredecessor(true); //TODO all of these can be simplified
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
setStateAsMergeWithPrev();
@@ -176,15 +178,18 @@
}
}
}
- }
+ } // TODO else voteToHalt (when you combine steps 2 and 3)
this.activate();
}
- else if (getSuperstep() % 4 == 0){
+ else if (getSuperstep() % 4 == 0){
+ // TODO separate function for this step
//update neighber
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
- processUpdate();
- if(isInactiveNode() || isHeadUnableToMerge())
+ processUpdate(); // TODO pass incomingMsg as a parameter
+
+ // TODO move outside the loop
+ if(isInactiveNode() || isHeadUnableToMerge()) // check structure and neighbor restriction
voteToHalt();
else
activate();
@@ -197,19 +202,19 @@
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
/** process merge **/
- processMerge();
+ processMerge(); // TODO use incomingMsg as a parameter
// set statistics counter: Num_MergedNodes
updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
/** if it's a tandem repeat, which means detecting cycle **/
- if(isTandemRepeat(getVertexValue())){
+ if(isTandemRepeat(getVertexValue())){ // TODO check 3 node cycle to make sure the update is cocrect (try several times)
for(byte d : DirectionFlag.values)
- getVertexValue().getEdgeList(d).reset();
+ getVertexValue().getEdgeList(d).reset(); // TODO don't remove tandem repeats but DO stop merging // we shouldn't need to update neighbors
// set statistics counter: Num_TandemRepeats
- updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats);
+ updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats); // TODO cycle instead of tandem repeat
getVertexValue().setCounters(counters);
- voteToHalt();
+ voteToHalt(); // TODO make sure you're checking structure to preclude tandem repeats
}/** head meets head, stop **/
- else if(VertexUtil.isUnMergeVertex(getVertexValue()) || isHeadMeetsHead()){
+ else if(!VertexUtil.isCanMergeVertex(getVertexValue()) || isHeadMeetsHead()){
getVertexValue().setState(State.HEAD_CANNOT_MERGE);
// set statistics counter: Num_MergedPaths
updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
@@ -217,7 +222,7 @@
voteToHalt();
}
else{
- getVertexValue().setCounters(counters);
+ getVertexValue().setCounters(counters); // TODO move all the setCounter calls outside the if/else blocks
activate();
}
}