add updateEdges for pathmerge. Issue: mergeWithStartreads
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 1cbf196..a83d7d3 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -353,7 +353,33 @@
mergeCoverage(other);
internalKmer.mergeWithKmerInDir(dir, KmerBytesWritable.lettersInKmer, other.internalKmer);
}
-
+
+
+ /**
+ * update my edge list
+ */
+ public void updateEdges(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
+ edges[deleteDir].remove(toDelete);
+ switch (updateDir) {
+ case DirectionFlag.DIR_FF:
+ edges[DirectionFlag.DIR_FF].unionUpdate(other.edges[DirectionFlag.DIR_FF]);
+ edges[DirectionFlag.DIR_FR].unionUpdate(other.edges[DirectionFlag.DIR_FR]);
+ break;
+ case DirectionFlag.DIR_FR:
+ edges[DirectionFlag.DIR_FF].unionUpdate(other.edges[DirectionFlag.DIR_RF]);
+ edges[DirectionFlag.DIR_FR].unionUpdate(other.edges[DirectionFlag.DIR_RR]);
+ break;
+ case DirectionFlag.DIR_RF:
+ edges[DirectionFlag.DIR_RF].unionUpdate(other.edges[DirectionFlag.DIR_FF]);
+ edges[DirectionFlag.DIR_RR].unionUpdate(other.edges[DirectionFlag.DIR_FR]);
+ break;
+ case DirectionFlag.DIR_RR:
+ edges[DirectionFlag.DIR_RF].unionUpdate(other.edges[DirectionFlag.DIR_RF]);
+ edges[DirectionFlag.DIR_RR].unionUpdate(other.edges[DirectionFlag.DIR_RR]);
+ break;
+ }
+ }
+
/**
* merge my edge list (both kmers and readIDs) with those of `other`. Assumes that `other` is doing the flipping, if any.
*/
@@ -441,11 +467,11 @@
case DirectionFlag.DIR_RR:
newThisOffset = otherLength - K + 1;
// shift my offsets (other is prepended)
- for (PositionWritable p : startReads) {
- p.set(p.getMateId(), p.getReadId(), newThisOffset + p.getPosId());
+ for (PositionWritable p : other.startReads) {
+ startReads.append(p.getMateId(), p.getReadId(), newThisOffset + p.getPosId());
}
for (PositionWritable p : other.endReads) {
- p.set(p.getMateId(), p.getReadId(), newThisOffset + p.getPosId());
+ endReads.append(p.getMateId(), p.getReadId(), newThisOffset + p.getPosId());
}
break;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
index f086698..696755d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
@@ -217,8 +217,14 @@
//
// this.getNode().mergeEdges(mergeDir, node);
// }
- public void processUpdates(byte neighborToDeleteDir, NodeWritable node){
- this.getNode().mergeEdges(neighborToDeleteDir, node);
+ public void processUpdates(byte deleteDir, VKmerBytesWritable toDelete, byte updateDir, NodeWritable other){
+ this.getNode().updateEdges(deleteDir, toDelete, updateDir, other);
+ }
+
+ public void processMerges(byte mergeDir, NodeWritable node, int kmerSize){
+ KmerBytesWritable.setGlobalKmerLength(kmerSize);
+ mergeDir = (byte)(mergeDir & MessageFlag.DIR_MASK);
+ super.getNode().mergeWithNode(mergeDir, node);
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 5db3636..2377e2a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -364,6 +364,7 @@
incomingMsg = msgIterator.next();
if(getHeadFlag() != MessageFlag.IS_HEAD){
getVertexValue().setState(incomingMsg.getFlag());
+ this.activate();
} else{ /** already set up **/
/** if headMergeDir are not the same **/
getVertexValue().setState(MessageFlag.IS_HALT);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 87c4c76..86c3cf0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -36,9 +36,11 @@
byte meToNeighborDir = (byte) (inFlag & MessageFlag.DIR_MASK);
byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+ byte neighborToMergeDir = flipDirection(neighborToMeDir, incomingMsg.isFlip());
// getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
// neighborToMergeDir, incomingMsg.getNeighborEdge());
- getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getNode());
+ getVertexValue().processUpdates(neighborToMeDir, incomingMsg.getSourceVertexId(),
+ neighborToMergeDir, incomingMsg.getNode());
}
/**
@@ -83,11 +85,12 @@
getVertexValue().setState(state);
}
- byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
+// byte neighborToMergeDir = flipDirection(neighborToMeDir, msg.isFlip());
- getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
- neighborToMergeDir, msg.getNeighborEdge(),
- kmerSize, msg.getNode());
+ getVertexValue().processMerges(neighborToMeDir, msg.getNode(), kmerSize);
+// getVertexValue().processMerges(neighborToMeDir, msg.getSourceVertexId(),
+// neighborToMergeDir, msg.getNeighborEdge(),
+// kmerSize, msg.getNode());
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 2022976..8663320 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -214,8 +214,10 @@
selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
processMerge();
//head meets head, stop
- if(getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)
+ if(getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD){
+ getVertexValue().setState(MessageFlag.IS_HALT);
voteToHalt();
+ }
else
this.activate();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
index c64a5cf..4b0220f 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeTestSuite.java
@@ -6,16 +6,16 @@
public static Test suite() throws Exception {
String pattern ="PathMerge";
- String testSet[] = //{"2", "3", "4", "5", "6", "7", "8", "9"};
- {
+ String testSet[] = {"2", "3", "4", "5", "6", "7", "8", "9", "head_6", "head_7"};
+// {
// "SimplePath",
// "ThreeDuplicate",
// "head_6"
- "head_7"
+// "head_7"
// "CyclePath",
// "SelfPath"
// "TreePath"
- };
+// };
init(pattern, testSet);
BasicGraphCleanTestSuite testSuite = new BasicGraphCleanTestSuite();
return makeTestSuite(testSuite);