Refactor removeLowCoverage
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index e7b0509..ffcb449 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -114,6 +114,7 @@
RR((byte)(0b11 << 0));
public static final byte MASK = (byte)(0b11 << 0);
+ public static final byte CLEAR = (byte)(0b1111100 << 0);
private final byte val;
private EDGETYPE(byte val){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 8c64cea..1b1d748 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -560,22 +560,6 @@
return null;
}
- public boolean isTandemRepeat(VertexValueWritable value){
- VKmerBytesWritable kmerToCheck;
- for(EDGETYPE et : EnumSet.allOf(EDGETYPE.class)){
- Iterator<VKmerBytesWritable> it = value.getEdgeList(et).getKeyIterator();
- while(it.hasNext()){
- kmerToCheck = it.next();
- if(kmerToCheck.equals(getVertexId())){
- repeatEdgetype = et;
- repeatKmer.setAsCopy(kmerToCheck);
- return true;
- }
- }
- }
- return false;
- }
-
/**
* set statistics counter
*/
@@ -689,7 +673,7 @@
//2013.9.21 ------------------------------------------------------------------//
/**
- * get destination vertex
+ * get destination vertex ex. RemoveTip
*/
public VKmerBytesWritable getDestVertexId(DIR direction){
int degree = getVertexValue().getDegree(direction);
@@ -706,4 +690,40 @@
//degree in this direction == 0
throw new IllegalArgumentException("degree > 0, getDestVertexId(DIR direction) only can use for degree == 1 + \n" + getVertexValue().toString());
}
+
+ /**
+ * check if I am a tandemRepeat
+ */
+ public boolean isTandemRepeat(VertexValueWritable value){
+ VKmerBytesWritable kmerToCheck;
+ for(EDGETYPE et : EnumSet.allOf(EDGETYPE.class)){
+ Iterator<VKmerBytesWritable> it = value.getEdgeList(et).getKeyIterator();
+ while(it.hasNext()){
+ kmerToCheck = it.next();
+ if(kmerToCheck.equals(getVertexId())){
+ repeatEdgetype = et;
+ repeatKmer.setAsCopy(kmerToCheck);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * broadcastKillself ex. RemoveLow
+ */
+ public void broadcastKillself(){
+ VertexValueWritable vertex = getVertexValue();
+ for(EDGETYPE et : EnumSet.allOf(EDGETYPE.class)){
+ for(VKmerBytesWritable kmer : vertex.getEdgeList(et).getKeys()){
+ outFlag &= EDGETYPE.CLEAR;
+ outFlag |= et.mirror().get();
+ outgoingMsg.setFlag(outFlag);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId = kmer;
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
index cef3131..a2dedbb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/removelowcoverage/RemoveLowCoverageVertex.java
@@ -5,12 +5,14 @@
import java.util.Iterator;
import java.util.Set;
+import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.message.MessageWritable;
import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
+import edu.uci.ics.genomix.type.NodeWritable.EDGETYPE;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
/**
@@ -29,7 +31,9 @@
*/
@Override
public void initVertex() {
- super.initVertex();
+ super.initVertex();
+ if (minAverageCoverage < 0)
+ minAverageCoverage = Float.parseFloat(getContext().getConfiguration().get(GenomixJobConf.REMOVE_LOW_COVERAGE_MAX_COVERAGE));
if(incomingMsg == null)
incomingMsg = new MessageWritable();
if(outgoingMsg == null)
@@ -38,11 +42,6 @@
outgoingMsg.reset();
if(destVertexId == null)
destVertexId = new VKmerBytesWritable();
- if(fakeVertex == null){
- fakeVertex = new VKmerBytesWritable();
- String random = generaterRandomString(kmerSize + 1);
- fakeVertex.setByRead(kmerSize + 1, random.getBytes(), 0);
- }
if(getSuperstep() == 1)
StatisticsAggregator.preGlobalCounters.clear();
// else
@@ -51,32 +50,42 @@
getVertexValue().getCounters().clear();
}
+ public void lowCoverageVertexBroadcaseKillself(){
+ if(getVertexValue().getAvgCoverage() <= minAverageCoverage){
+ //broadcase kill self
+ broadcastKillself();
+ deadNodeSet.add(new VKmerBytesWritable(getVertexId()));
+ }
+ }
+
+ public void cleanupDeadVertex(){
+ deleteVertex(getVertexId());
+ //set statistics counter: Num_RemovedLowCoverageNodes
+ updateStatisticsCounter(StatisticsCounter.Num_RemovedLowCoverageNodes);
+ getVertexValue().setCounters(counters);
+ }
+
+ public void responseToDeadVertex(Iterator<MessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ //response to dead node
+ EDGETYPE deadToMeEdgetype = EDGETYPE.fromByte(incomingMsg.getFlag());
+ getVertexValue().getEdgeList(deadToMeEdgetype).remove(incomingMsg.getSourceVertexId());
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
- if(getSuperstep() == 1){
- if(getVertexValue().getAvgCoverage() <= minAverageCoverage){
- broadcaseReallyKillself();
- deadNodeSet.add(new VKmerBytesWritable(getVertexId()));
- }
+ if(getSuperstep() == 1)
+ lowCoverageVertexBroadcaseKillself();
+ else if(getSuperstep() == 2){
+ if(deadNodeSet.contains(getVertexId()))
+ cleanupDeadVertex();
else
- voteToHalt();
- } else if(getSuperstep() == 2){
- if(deadNodeSet.contains(getVertexId())){
- deleteVertex(getVertexId());
- //set statistics counter: Num_RemovedLowCoverageNodes
- updateStatisticsCounter(StatisticsCounter.Num_RemovedLowCoverageNodes);
- getVertexValue().setCounters(counters);
- }
- else{
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- if(isResponseKillMsg())
- responseToDeadVertex();
- }
- }
- voteToHalt();
+ responseToDeadVertex(msgIterator);
}
+ voteToHalt();
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 576d0da..3da5d94 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -82,42 +82,6 @@
}
}
voteToHalt();
-
-// if(getSuperstep() == 1){
-// if(VertexUtil.isIncomingTipVertex(getVertexValue())){
-// if(getVertexValue().getKmerLength() <= length){
-// sendSettledMsgToNextNode();
-// deleteVertex(getVertexId());
-// //set statistics counter: Num_RemovedTips
-// updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
-// getVertexValue().setCounters(counters);
-// }
-// }
-// else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
-// if(getVertexValue().getKmerLength() <= length){
-// sendSettledMsgToPrevNode();
-// deleteVertex(getVertexId());
-// //set statistics counter: Num_RemovedTips
-// updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
-// getVertexValue().setCounters(counters);
-// }
-// }
-// else if(VertexUtil.isSingleVertex(getVertexValue())){
-// if(getVertexValue().getKmerLength() <= length){
-// deleteVertex(getVertexId());
-// //set statistics counter: Num_RemovedTips
-// updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
-// getVertexValue().setCounters(counters);
-// }
-// }
-// }
-// else if(getSuperstep() == 2){
-// while(msgIterator.hasNext()){
-// incomingMsg = msgIterator.next();
-// responseToDeadVertex();
-// }
-// }
-// voteToHalt();
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
index 084712a..390e5fe 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/unrolltandemrepeat/UnrollTandemRepeat.java
@@ -21,6 +21,7 @@
*/
public class UnrollTandemRepeat extends
BasicGraphCleanVertex<VertexValueWritable, MessageWritable>{
+
private EdgeWritable tmpEdge = new EdgeWritable();
/**
@@ -52,13 +53,13 @@
tmpValue.setAsCopy(getVertexValue());
tmpValue.getEdgeList(repeatEdgetype).remove(repeatKmer);
boolean hasFlip = false;
- /** pick one edge and flip **/
+ // pick one edge and flip
for(EDGETYPE et : EnumSet.allOf(EDGETYPE.class)){
for(EdgeWritable edge : tmpValue.getEdgeList(et)){
EDGETYPE flipDir = et.flip();
tmpValue.getEdgeList(flipDir).add(edge);
tmpValue.getEdgeList(et).remove(edge);
- /** setup hasFlip to go out of the loop **/
+ // setup hasFlip to go out of the loop
hasFlip = true;
break;
}
@@ -117,8 +118,8 @@
if(getSuperstep() == 1){
if(isTandemRepeat(getVertexValue()) && repeatCanBeMerged()){
mergeTandemRepeat();
- //set statistics counter: Num_RemovedTips
- updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+ //set statistics counter: Num_TandemRepeats
+ updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats);
getVertexValue().setCounters(counters);
}
voteToHalt();