debugging BubbleMergeVertex
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
index 9b8a03d..83686fb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/bubblemerge/BubbleMergeVertex.java
@@ -34,7 +34,14 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- outgoingMsg.reset();
+ if(incomingMsg == null)
+ incomingMsg = new MessageWritable();
+ if(outgoingMsg == null)
+ outgoingMsg = new MessageWritable();
+ else
+ outgoingMsg.reset(kmerSize);
+ if(destVertexId == null)
+ destVertexId = new VKmerBytesWritable();
}
public void sendBubbleAndMajorVertexMsgToMinorVertex(){
@@ -64,7 +71,24 @@
}
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked" })
+ public void aggregateBubbleNodesByMajorNode(Iterator<MessageWritable> msgIterator){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if(!receivedMsgMap.containsKey(incomingMsg.getStartVertexId())){
+ receivedMsgList.clear();
+ receivedMsgList.add(incomingMsg);
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
+ }
+ else{
+ receivedMsgList.clear();
+ receivedMsgList.addAll(receivedMsgMap.get(incomingMsg.getStartVertexId()));
+ receivedMsgList.add(incomingMsg);
+ receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
+ }
+ }
+ }
+
@Override
public void compute(Iterator<MessageWritable> msgIterator) {
initVertex();
@@ -82,20 +106,9 @@
}
}
} else if (getSuperstep() == 3){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- if(!receivedMsgMap.containsKey(incomingMsg.getStartVertexId())){
- receivedMsgList.clear();
- receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
- }
- else{
- receivedMsgList.clear();
- receivedMsgList.addAll(receivedMsgMap.get(incomingMsg.getStartVertexId()));
- receivedMsgList.add(incomingMsg);
- receivedMsgMap.put(incomingMsg.getStartVertexId(), (ArrayList<MessageWritable>)receivedMsgList.clone());
- }
- }
+ /** aggregate bubble nodes and grouped by major vertex **/
+ aggregateBubbleNodesByMajorNode(msgIterator);
+
for(VKmerBytesWritable prevId : receivedMsgMap.keySet()){
if(receivedMsgList.size() > 1){ // filter bubble
/** for each startVertex, sort the node by decreasing order of coverage **/
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 2ef9958..fb51e52 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -12,6 +12,7 @@
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeAddVertex;
import edu.uci.ics.genomix.pregelix.operator.bridgeremove.BridgeRemoveVertex;
import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleAddVertex;
+import edu.uci.ics.genomix.pregelix.operator.bubblemerge.BubbleMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P2ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
@@ -210,42 +211,32 @@
generateBubbleAddGraphJob("BubbleAddGraph", outputBase
+ "BubbleAddGraph.xml");
}
-//
-// private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
-// PregelixJob job = new PregelixJob(jobName);
-// job.setVertexClass(BubbleMergeVertex.class);
-// job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
-// job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
-// job.setDynamicVertexValueSize(true);
-// job.setOutputKeyClass(PositionWritable.class);
-// job.setOutputValueClass(VertexValueWritable.class);
-// job.getConfiguration().setInt(BubbleMergeVertex.KMER_SIZE, 5);
-// job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
-// }
-//
-// private static void genBubbleMergeGraph() throws IOException {
-// generateBubbleMergeGraphJob("BubbleMergeGraph", outputBase
-// + "BubbleMergeGraph.xml");
-// }
+
+ private static void generateBubbleMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(BubbleMergeVertex.class);
+ job.setVertexInputFormatClass(GraphCleanInputFormat.class);
+ job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(VKmerBytesWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(BubbleMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genBubbleMergeGraph() throws IOException {
+ generateBubbleMergeGraphJob("BubbleMergeGraph", outputBase
+ + "BubbleMergeGraph.xml");
+ }
public static void main(String[] args) throws IOException {
- //genNaiveAlgorithmForMergeGraph();
-// genLogAlgorithmForMergeGraph();
- //genP3ForMergeGraph();
- //genTipAddGraph();
-// genTipRemoveGraph();
-// genBridgeAddGraph();
-// genBridgeRemoveGraph();
-// genBubbleAddGraph();
-// genBubbleMergeGraph();
-// genP4ForMergeGraph();
-// genMapReduceGraph();
genSplitRepeatGraph();
genTipAddGraph();
genBridgeAddGraph();
genTipRemoveGraph();
genBridgeRemoveGraph();
genBubbleAddGraph();
+ genBubbleMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java
index 57647ab..21020ce 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BubbleMergeSmallTestSuite.java
@@ -43,9 +43,9 @@
public class BubbleMergeSmallTestSuite extends TestSuite {
private static final Logger LOGGER = Logger.getLogger(BubbleMergeSmallTestSuite.class.getName());
- public static final String PreFix = "data/input"; //"graphbuildresult";
+ public static final String PreFix = "data/actual/bubbleadd/BubbleAddGraph/bin";
public static final String[] TestDir = { PreFix + File.separator
- + "graphs/bubblemerge/fr_bubble"};
+ + "5"};
private static final String ACTUAL_RESULT_DIR = "data/actual/bubblemerge";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";