PathMerge aggregator is completed
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 7b82dc1..29183c5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -58,6 +58,7 @@
protected byte incomingEdgeDir = 0; //SplitRepeat
protected byte outgoingEdgeDir = 0; //SplitRepeat
+ protected HashMapWritable<ByteWritable, VLongWritable> counters = new HashMapWritable<ByteWritable, VLongWritable>();
/**
* initiate kmerSize, maxIteration
*/
@@ -698,6 +699,23 @@
outgoingEdgeDir = connectedTable[i][1];
}
+
+ /**
+ * set statistics counter
+ */
+ public void updateStatisticsCounter(byte counterName){
+ ByteWritable counterNameWritable = new ByteWritable(counterName);
+ if(counters.containsKey(counterNameWritable))
+ counters.get(counterNameWritable).set(counters.get(counterNameWritable).get() + 1);
+ else
+ counters.put(counterNameWritable, new VLongWritable(1));
+ }
+
+ /**
+ * read statistics counters
+ * @param conf
+ * @return
+ */
public static HashMapWritable<ByteWritable, VLongWritable> readStatisticsCounterResult(Configuration conf) {
try {
VertexValueWritable value = (VertexValueWritable) IterationUtils
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 57eb2f2..174fb84 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -4,19 +4,12 @@
import java.util.Iterator;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ByteWritable;
-import edu.uci.ics.genomix.pregelix.io.HashMapWritable;
import edu.uci.ics.genomix.pregelix.io.PathMergeMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VLongWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
@@ -73,7 +66,6 @@
private boolean nextHead;
private boolean prevHead;
- HashMapWritable<ByteWritable, VLongWritable> counters = new HashMapWritable<ByteWritable, VLongWritable>();
/**
* initiate kmerSize, maxIteration
*/
@@ -109,6 +101,8 @@
tmpValue.reset();
if(getSuperstep() > 1)
StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+ counters.clear();
+ getVertexValue().getCounters().clear();
}
protected boolean isNodeRandomHead(VKmerBytesWritable nodeKmer) {
@@ -153,139 +147,108 @@
return false;
}
- /**
- * set statistics counter
- */
- public void updateStatisticsCounter(byte counterName){
- ByteWritable counterNameWritable = new ByteWritable(counterName);
- if(counters.containsKey(counterNameWritable))
- counters.get(counterNameWritable).set(counters.get(counterNameWritable).get() + 1);
- else
- counters.put(counterNameWritable, new VLongWritable(1));
- }
-
@Override
public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
initVertex();
- counters.clear();
- getVertexValue().getCounters().clear();
- if(getSuperstep() == 1){
-// if(getVertexId().toString().contains("AT") || getVertexId().toString().contains("GA")){
- updateStatisticsCounter(StatisticsCounter.MergedNodes);
-// updateStatisticsCounter(StatisticsCounter.MergedPaths);
- getVertexValue().setCounters(counters);
- activate();
-// }
- } else if(getSuperstep() == 2){
- if(getVertexId().toString().contains("AA")){
- updateStatisticsCounter(StatisticsCounter.MergedNodes);
- } else{
- updateStatisticsCounter(StatisticsCounter.MergedPaths);
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if (getSuperstep() == 2)
+ initState(msgIterator);
+ else if (getSuperstep() % 4 == 3){
+ outFlag |= headFlag;
+
+ outFlag |= MessageFlag.NO_MERGE;
+ setStateAsNoMerge();
+
+ // only PATH vertices are present. Find the ID's for my neighbors
+ curKmer.setAsCopy(getVertexId());
+
+ curHead = isNodeRandomHead(curKmer);
+
+ // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
+ // We prevent merging towards non-path nodes
+ hasNext = setNextInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHNEXT));
+ hasPrev = setPrevInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHPREV));
+ if (hasNext || hasPrev) {
+ if (curHead) {
+ if (hasNext && !nextHead) {
+ // compress this head to the forward tail
+ sendUpdateMsgToPredecessor();
+ } else if (hasPrev && !prevHead) {
+ // compress this head to the reverse tail
+ sendUpdateMsgToSuccessor();
+ }
+ }
+ else {
+ // I'm a tail
+ if (hasNext && hasPrev) {
+ if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
+ // tails on both sides, and I'm the "local minimum"
+ // compress me towards the tail in forward dir
+ sendUpdateMsgToPredecessor();
+ }
+ } else if (!hasPrev) {
+ // no previous node
+ if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
+ // merge towards tail in forward dir
+ sendUpdateMsgToPredecessor();
+ }
+ } else if (!hasNext) {
+ // no next node
+ if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
+ // merge towards tail in reverse dir
+ sendUpdateMsgToSuccessor();
+ }
+ }
+ }
}
- getVertexValue().setCounters(counters);
- voteToHalt();
- }
-// else{
-
-// updateStatisticsCounter(StatisticsCounter.MergedPaths);
-// getVertexValue().setCounters(counters);
-// voteToHalt();
-// }
-// if (getSuperstep() == 1)
-// startSendMsg();
-// else if (getSuperstep() == 2)
-// initState(msgIterator);
-// else if (getSuperstep() % 4 == 3){
-// outFlag |= headFlag;
-//
-// outFlag |= MessageFlag.NO_MERGE;
-// setStateAsNoMerge();
-//
-// // only PATH vertices are present. Find the ID's for my neighbors
-// curKmer.setAsCopy(getVertexId());
-//
-// curHead = isNodeRandomHead(curKmer);
-//
-// // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path.
-// // We prevent merging towards non-path nodes
-// hasNext = setNextInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHNEXT));
-// hasPrev = setPrevInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHPREV));
-// if (hasNext || hasPrev) {
-// if (curHead) {
-// if (hasNext && !nextHead) {
-// // compress this head to the forward tail
-// sendUpdateMsgToPredecessor();
-// } else if (hasPrev && !prevHead) {
-// // compress this head to the reverse tail
-// sendUpdateMsgToSuccessor();
-// }
-// }
-// else {
-// // I'm a tail
-// if (hasNext && hasPrev) {
-// if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
-// // tails on both sides, and I'm the "local minimum"
-// // compress me towards the tail in forward dir
-// sendUpdateMsgToPredecessor();
-// }
-// } else if (!hasPrev) {
-// // no previous node
-// if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
-// // merge towards tail in forward dir
-// sendUpdateMsgToPredecessor();
-// }
-// } else if (!hasNext) {
-// // no next node
-// if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
-// // merge towards tail in reverse dir
-// sendUpdateMsgToSuccessor();
-// }
-// }
-// }
-// }
-// this.activate();
-// }
-// else if (getSuperstep() % 4 == 0){
-// //update neighber
-// while (msgIterator.hasNext()) {
-// incomingMsg = msgIterator.next();
-// processUpdate();
-// if(isHaltNode())
-// voteToHalt();
-// else
-// this.activate();
-// }
-// } else if (getSuperstep() % 4 == 1){
-// //send message to the merge object and kill self
-// broadcastMergeMsg();
-// } else if (getSuperstep() % 4 == 2){
-// //merge tmpKmer
-// while (msgIterator.hasNext()) {
-// incomingMsg = msgIterator.next();
-// selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
-// /** process merge **/
-// processMerge();
-// // set statistics counter: MergedNodes
-// updateStatisticsCounter(StatisticsCounter.MergedNodes);
-// /** if it's a tandem repeat, which means detecting cycle **/
-// if(isTandemRepeat()){
-// for(byte d : DirectionFlag.values)
-// getVertexValue().getEdgeList(d).reset();
-// getVertexValue().setState(MessageFlag.IS_HALT);
-// // set statistics counter: TandemRepeats
-// updateStatisticsCounter(StatisticsCounter.TandemRepeats);
-// voteToHalt();
-// }/** head meets head, stop **/
-// else if((getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)){
-// getVertexValue().setState(MessageFlag.IS_HALT);
-// // set statistics counter: MergedPaths
-// updateStatisticsCounter(StatisticsCounter.MergedPaths);
-// voteToHalt();
-// }
-// else
-// this.activate();
-// }
-// }
+ this.activate();
+ }
+ else if (getSuperstep() % 4 == 0){
+ //update neighber
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ processUpdate();
+ if(isHaltNode())
+ voteToHalt();
+ else
+ this.activate();
+ }
+ } else if (getSuperstep() % 4 == 1){
+ //send message to the merge object and kill self
+ broadcastMergeMsg();
+ } else if (getSuperstep() % 4 == 2){
+ //merge tmpKmer
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
+ /** process merge **/
+ processMerge();
+ // set statistics counter: Num_MergedNodes
+ updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
+ /** if it's a tandem repeat, which means detecting cycle **/
+ if(isTandemRepeat()){
+ for(byte d : DirectionFlag.values)
+ getVertexValue().getEdgeList(d).reset();
+ getVertexValue().setState(MessageFlag.IS_HALT);
+ // set statistics counter: Num_TandemRepeats
+ updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats);
+ getVertexValue().setCounters(counters);
+ voteToHalt();
+ }/** head meets head, stop **/
+ else if((getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)){
+ getVertexValue().setState(MessageFlag.IS_HALT);
+ // set statistics counter: Num_MergedPaths
+ updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
+ getVertexValue().setCounters(counters);
+ voteToHalt();
+ }
+ else{
+ getVertexValue().setCounters(counters);
+ this.activate();
+ }
+ }
+ }
}
public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 161a41e..8c5ebb0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -11,6 +11,8 @@
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
+import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
+import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
import edu.uci.ics.genomix.pregelix.util.VertexUtil;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -65,6 +67,10 @@
outgoingMsg.reset();
if(destVertexId == null)
destVertexId = new VKmerBytesWritable();
+ if(getSuperstep() > 1)
+ StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+ counters.clear();
+ getVertexValue().getCounters().clear();
}
@Override
@@ -75,18 +81,27 @@
if(getVertexValue().getKmerLength() <= length){
sendSettledMsgToNextNode();
deleteVertex(getVertexId());
+ //set statistics counter: Num_RemovedTips
+ updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+ getVertexValue().setCounters(counters);
}
}
else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
if(getVertexValue().getKmerLength() <= length){
-
sendSettledMsgToPrevNode();
deleteVertex(getVertexId());
+ //set statistics counter: Num_RemovedTips
+ updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+ getVertexValue().setCounters(counters);
}
}
else if(VertexUtil.isSingleVertex(getVertexValue())){
- if(getVertexValue().getKmerLength() <= length)
+ if(getVertexValue().getKmerLength() <= length){
deleteVertex(getVertexId());
+ //set statistics counter: Num_RemovedTips
+ updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+ getVertexValue().setCounters(counters);
+ }
}
}
else if(getSuperstep() == 2){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
index fa7ee42..962699c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
@@ -1,7 +1,29 @@
package edu.uci.ics.genomix.pregelix.type;
public class StatisticsCounter {
- public static final byte MergedNodes = 0b00 << 0;
- public static final byte MergedPaths = 0b01 << 0;
- public static final byte TandemRepeats = 0b10 << 0;
+ public static final byte Num_MergedNodes = 0b00 << 0;
+ public static final byte Num_MergedPaths = 0b01 << 0;
+ public static final byte Num_TandemRepeats = 0b10 << 0;
+ public static final byte Num_RemovedTips = 0b11 << 0;
+
+ public final static class COUNTER_CONTENT{
+ public static String getContent(byte code){
+ String r = "";
+ switch(code){
+ case Num_MergedNodes:
+ r = "num of merged nodes";
+ break;
+ case Num_MergedPaths:
+ r = "num of merge paths";
+ break;
+ case Num_TandemRepeats:
+ r = "num of tandem repeats";
+ break;
+ case Num_RemovedTips:
+ r = "num of removed tips";
+ break;
+ }
+ return r;
+ }
+ }
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 315f3aa..f2c0501 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -148,6 +148,7 @@
private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(new GenomixJobConf(3), jobName);
job.setVertexClass(TipAddVertex.class);
+ job.setGlobalAggregatorClass(StatisticsAggregator.class);
job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
@@ -164,6 +165,7 @@
private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(new GenomixJobConf(3), jobName);
job.setVertexClass(TipRemoveVertex.class);
+ job.setGlobalAggregatorClass(StatisticsAggregator.class);
job.setVertexInputFormatClass(GraphCleanInputFormat.class);
job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
job.setDynamicVertexValueSize(true);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
index 4d53925..11c5fb5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
@@ -29,6 +29,7 @@
import edu.uci.ics.genomix.pregelix.io.VLongWritable;
import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
+import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -66,6 +67,16 @@
}
}
+ public void outputCounters(HashMapWritable<ByteWritable, VLongWritable> counters){
+ String output = "";
+ for(ByteWritable counterName : counters.keySet()){
+ output += StatisticsCounter.COUNTER_CONTENT.getContent(counterName.get());
+ output += ": ";
+ output += counters.get(counterName).toString() + "\n";
+ }
+ System.out.println(output);
+ }
+
@Test
public void test() throws Exception {
setUp();
@@ -73,8 +84,10 @@
for (Plan plan : plans) {
driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
- HashMapWritable<ByteWritable, VLongWritable> counters = BasicGraphCleanVertex.readStatisticsCounterResult(job.getConfiguration());
- System.out.println(counters.toString());
+// HashMapWritable<ByteWritable, VLongWritable> counters = BasicGraphCleanVertex.readStatisticsCounterResult(job.getConfiguration());
+// //output counters
+// outputCounters(counters);
+// System.out.println("");
}
compareResults();
tearDown();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java
index 81dc9ba..33e0650 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java
@@ -6,7 +6,7 @@
public static Test suite() throws Exception {
String pattern ="TipRemove";
- String testSet[] = {"SmallGenome", "FR_Tip", "RF_Tip"};
+ String testSet[] = {"FR_Tip"};//{"SmallGenome", "FR_Tip", "RF_Tip"};
init(pattern, testSet);
BasicGraphCleanTestSuite testSuite = new BasicGraphCleanTestSuite();
return makeTestSuite(testSuite);