debugging p4 and fix random seed issue
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 88cfb79..ae921c4 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -561,6 +561,8 @@
JobConf conf = new JobConf(baseConf);
conf.setJarByClass(MergePathsH4.class);
conf.setJobName("MergePathsH4 " + inputPath);
+
+ //another comment
FileInputFormat.addInputPaths(conf, inputPath);
Path outputPath = new Path(inputPath + ".h4merge.tmp");
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
index ec91767..a3ea666 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4Driver.java
@@ -128,6 +128,8 @@
return (path.matches(".*" + COMPLETE + "_i\\d+$") || path.matches(".*" + UPDATES + "_i\\d+$") || path.endsWith(lastMergeOutput));
}
};
+ // test comment
+
StringBuilder sb = new StringBuilder();
String delim = "";
for (FileStatus file : dfs.globStatus(new Path(inputGraphPath.replaceAll("/$", "") + "*"), updateFilter)) {
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
index 6729c78..4148982 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/pmcommon/NodeWithFlagWritable.java
@@ -112,6 +112,7 @@
throws IOException {
byte updateFlag = updateMsg.getFlag();
NodeWritable updateNode = updateMsg.getNode();
+
if ((updateFlag & MessageFlag.MSG_UPDATE_EDGE) == MessageFlag.MSG_UPDATE_EDGE) {
// this message wants to update the edges of node.
// remove position and merge its position lists with node
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index 9238f9d..162663f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -53,27 +53,32 @@
* get destination vertex
*/
public PositionWritable getNextDestVertexId(VertexValueWritable value) {
- if(value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
+ if (value.getFFList().getCountOfPosition() > 0){ // #FFList() > 0
posIterator = value.getFFList().iterator();
outFlag |= MessageFlag.DIR_FF;
- }
- else{ // #FRList() > 0
+ return posIterator.next();
+ } else if (value.getFRList().getCountOfPosition() > 0){ // #FRList() > 0
posIterator = value.getFRList().iterator();
outFlag |= MessageFlag.DIR_FR;
+ return posIterator.next();
+ } else {
+ return null;
}
- return posIterator.next();
+
}
public PositionWritable getPreDestVertexId(VertexValueWritable value) {
- if(value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
+ if (value.getRFList().getCountOfPosition() > 0){ // #RFList() > 0
posIterator = value.getRFList().iterator();
outFlag |= MessageFlag.DIR_RF;
- }
- else{ // #RRList() > 0
+ return posIterator.next();
+ } else if (value.getRRList().getCountOfPosition() > 0){ // #RRList() > 0
posIterator = value.getRRList().iterator();
outFlag |= MessageFlag.DIR_RR;
+ return posIterator.next();
+ } else {
+ return null;
}
- return posIterator.next();
}
/**
@@ -223,7 +228,7 @@
public void broadcastUpdateMsg(){
if((getVertexValue().getState() & MessageFlag.IS_HEAD) > 0)
outFlag |= MessageFlag.IS_HEAD;
- switch(getVertexValue().getState() & 0b0001){
+ switch(getVertexValue().getState() & MessageFlag.SHOULD_MERGE_MASK){
case MessageFlag.SHOULD_MERGEWITHPREV:
setSuccessorAdjMsg();
if(ifFlipWithPredecessor())
@@ -231,7 +236,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getIncomingList());
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
+ if(getNextDestVertexId(getVertexValue()) != null)
+ sendMsg(getNextDestVertexId(getVertexValue()), outgoingMsg);
break;
case MessageFlag.SHOULD_MERGEWITHNEXT:
setPredecessorAdjMsg();
@@ -240,7 +246,8 @@
outgoingMsg.setFlag(outFlag);
outgoingMsg.setNeighberNode(getVertexValue().getOutgoingList());
outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
+ if(getPreDestVertexId(getVertexValue()) != null)
+ sendMsg(getPreDestVertexId(getVertexValue()), outgoingMsg);
break;
}
}
@@ -324,7 +331,7 @@
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
- public void sendUpMsgToPredecessor(){
+ public void sendUpdateMsgToPredecessor(){
byte state = getVertexValue().getState();
state |= MessageFlag.SHOULD_MERGEWITHNEXT;
getVertexValue().setState(state);
@@ -339,7 +346,7 @@
* This vertex tries to merge with next vertex and send update msg to neighber
* @throws IOException
*/
- public void sendUpMsgToSuccessor(){
+ public void sendUpdateMsgToSuccessor(){
byte state = getVertexValue().getState();
state |= MessageFlag.SHOULD_MERGEWITHPREV;
getVertexValue().setState(state);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index f250afc..ef699fc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -1,14 +1,10 @@
package edu.uci.ics.genomix.pregelix.operator.pathmerge;
-import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
-import org.apache.hadoop.io.NullWritable;
-
import edu.uci.ics.genomix.type.PositionWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.genomix.pregelix.client.Client;
import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
@@ -17,7 +13,6 @@
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.VertexUtil;
/*
* vertexId: BytesWritable
@@ -55,7 +50,7 @@
public static final String RANDSEED = "P4ForPathMergeVertex.randSeed";
public static final String PROBBEINGRANDOMHEAD = "P4ForPathMergeVertex.probBeingRandomHead";
- private static long randSeed = -1;
+ private static long randSeed = 1;
private float probBeingRandomHead = -1;
private Random randGenerator;
@@ -79,8 +74,9 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if (randSeed < 0)
- randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ //if (randSeed < 0)
+ // randSeed = getContext().getConfiguration().getLong("randomSeed", 0);
+ randSeed = getSuperstep();
randGenerator = new Random(randSeed);
if (probBeingRandomHead < 0)
probBeingRandomHead = getContext().getConfiguration().getFloat("probBeingRandomHead", 0.5f);
@@ -95,7 +91,9 @@
protected boolean isNodeRandomHead(PositionWritable nodeID) {
// "deterministically random", based on node id
- randGenerator.setSeed(randSeed ^ nodeID.hashCode());
+ //randGenerator.setSeed(randSeed);
+ //randSeed = randGenerator.nextInt();
+ randGenerator.setSeed((randSeed ^ nodeID.hashCode()) * 100000);//randSeed + nodeID.hashCode()
return randGenerator.nextFloat() < probBeingRandomHead;
}
@@ -157,36 +155,37 @@
hasPrev = setPrevInfo(getVertexValue()) && headFlag == 0;
if (hasNext || hasPrev) {
if (curHead) {
- if (hasNext && !nextHead) {
+ if (hasNext && !nextHead && (getNextDestVertexId(getVertexValue()) != null)) {
// compress this head to the forward tail
- sendUpMsgToPredecessor(); //TODO up -> update From -> to
- } else if (hasPrev && !prevHead) {
+ sendUpdateMsgToPredecessor(); //TODO up -> update From -> to
+ } else if (hasPrev && !prevHead && (getPreDestVertexId(getVertexValue()) != null)) {
// compress this head to the reverse tail
- sendUpMsgToSuccessor();
+ sendUpdateMsgToSuccessor();
}
} else {
// I'm a tail
if (hasNext && hasPrev) {
- if ((!nextHead && !prevHead) && (curID.compareTo(nextID) < 0 && curID.compareTo(prevID) < 0)) {
+ if ((!nextHead && !prevHead) && (curID.compareTo(nextID) > 0 && curID.compareTo(prevID) > 0)) {
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
- sendUpMsgToPredecessor();
+ sendUpdateMsgToPredecessor();
}
} else if (!hasPrev) {
// no previous node
- if (!nextHead && curID.compareTo(nextID) < 0) {
+ if (!nextHead && curID.compareTo(nextID) > 0) {
// merge towards tail in forward dir
- sendUpMsgToPredecessor();
+ sendUpdateMsgToPredecessor();
}
} else if (!hasNext) {
// no next node
- if (!prevHead && curID.compareTo(prevID) < 0) {
+ if (!prevHead && curID.compareTo(prevID) > 0) {
// merge towards tail in reverse dir
- sendUpMsgToSuccessor();
+ sendUpdateMsgToSuccessor();
}
}
}
}
+ voteToHalt();
}
else if (getSuperstep() % 4 == 0){
//update neighber
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index a63da20..5a113e0 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -18,6 +18,7 @@
import edu.uci.ics.genomix.pregelix.operator.pathmerge.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.pathmerge.P3ForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.P4ForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipAddVertex;
import edu.uci.ics.genomix.pregelix.operator.tipremove.TipRemoveVertex;
import edu.uci.ics.genomix.type.PositionWritable;
@@ -79,6 +80,23 @@
+ "P3ForMergeGraph.xml");
}
+ private static void generateP4ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(P4ForPathMergeVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(DataCleanOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(PositionWritable.class);
+ job.setOutputValueClass(VertexValueWritable.class);
+ job.getConfiguration().setInt(P4ForPathMergeVertex.KMER_SIZE, 3);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genP4ForMergeGraph() throws IOException {
+ generateP4ForMergeGraphJob("P4ForMergeGraph", outputBase
+ + "P4ForMergeGraph.xml");
+ }
+
private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(TipAddVertex.class);
@@ -183,7 +201,7 @@
public static void main(String[] args) throws IOException {
//genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
+ //genLogAlgorithmForMergeGraph();
//genP3ForMergeGraph();
//genTipAddGraph();
//genTipRemoveGraph();
@@ -191,6 +209,7 @@
//genBridgeRemoveGraph();
//genBubbleAddGraph();
//genBubbleMergeGraph();
+ genP4ForMergeGraph();
}
}