finish debug in p3
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index f849b21..e49063b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -11,6 +11,7 @@
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -41,6 +42,12 @@
@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
public String profiling = "false";
+
+ @Option(name = "-pseudo-rate", usage = "the rate of pseduHead", required = false)
+ public float pseudoRate = -1;
+
+ @Option(name = "-max-patitionround", usage = "max rounds in partition phase", required = false)
+ public int maxRound = -1;
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -64,7 +71,12 @@
if (options.numIteration > 0) {
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
}
+ if (options.pseudoRate > 0 && options.pseudoRate <= 1)
+ job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
+ if (options.maxRound > 0)
+ job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
return options;
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
index 96886de..e189800 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -54,11 +54,12 @@
Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
+ public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
+ public static final String MAXROUND = "P3ForPathMergeVertex.maxRound";
public static int kmerSize = -1;
private int maxIteration = -1;
- public static final double pseudoRatio = 0.2;
- public static int maxPseudoHeads = 1;
- public static int maxRound = 2;
+ public static float pseudoRate = -1;
+ public static int maxRound = -1;
private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
@@ -75,6 +76,10 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(pseudoRate < 0)
+ pseudoRate = getContext().getConfiguration().getFloat(PSEUDORATE, 0.2f);
+ if (maxRound < 0)
+ maxRound = getContext().getConfiguration().getInt(MAXROUND, 4);
outgoingMsg.reset();
}
@@ -149,17 +154,17 @@
}
} while (msgIterator.hasNext());
} else {
- /*double random = Math.random();
- if (random < pseudoRatio)
+ float random = (float) Math.random();
+ if (random < pseudoRate)
markPseudoHead();
else{
- getVertexValue().setState(State.NON);
+ getVertexValue().setState(State.NON_VERTEX);
voteToHalt();
- }*/
- if (getVertexId().toString().equals("CCTCA")) //AGTAC
+ }
+ /*if (getVertexId().toString().equals("CCTCA") || getVertexId().toString().equals("CTCAG")) //AGTAC CCTCA CTCAG CGCCC ACGCC
markPseudoHead();
else
- voteToHalt();
+ voteToHalt();*/
}
}
@@ -170,7 +175,8 @@
getVertexValue().setState(State.PSEUDOHEAD);
outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
destVertexId
- .set(getPreDestVertexId(getVertexId(), GeneCode.getGeneCodeFromBitMap(getVertexValue().getAdjMap())));
+ .set(getPreDestVertexId(getVertexId(),
+ GeneCode.getGeneCodeFromBitMap((byte) ((getVertexValue().getAdjMap() >> 4) & 0x0F))));
sendMsg(destVertexId, outgoingMsg);
}
@@ -238,9 +244,7 @@
.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
sendMsg(destVertexId, outgoingMsg);
} else {
- getVertexValue().setMergeChain(
- kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
+ mergeChainVertex();
byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
getVertexValue().setAdjMap(adjMap);
getVertexValue().setState(State.FINAL_VERTEX);
@@ -276,10 +280,7 @@
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
sendMsg(destVertexId, outgoingMsg);
- }
- else{
- outgoingMsg.setMessage(Message.FROMSELF);
- sendMsg(getVertexId(), outgoingMsg);
+ voteToHalt();
}
} else {
while (msgIterator.hasNext()) {
@@ -289,19 +290,23 @@
mergeChainVertex();
byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
getVertexValue().setAdjMap(adjMap);
- if (incomingMsg.getMessage() != Message.STOP && incomingMsg.getMessage() != Message.FROMREAR) {
+ if (incomingMsg.getMessage() != Message.STOP
+ && incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
outgoingMsg.setSourceVertexId(getVertexId());
destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
incomingMsg.getAdjMap()));
sendMsg(destVertexId, outgoingMsg);
+ voteToHalt();
} else {
//check head or pseudoHead
if (getVertexValue().getState() == State.START_VERTEX
- && incomingMsg.getMessage() == Message.FROMREAR) {
+ && incomingMsg.getMessage() == Message.STOP) {
getVertexValue().setState(State.FINAL_VERTEX);
//String source = getVertexValue().getMergeChain().toString();
//System.out.println();
- }
+ } else if(getVertexValue().getState() == State.PSEUDOHEAD
+ && incomingMsg.getMessage() == Message.STOP)
+ getVertexValue().setState(State.END_VERTEX);
}
}
}
@@ -312,46 +317,42 @@
* path node sends message back to head node in partition phase
*/
public void responseMsgToHeadVertexPartitionPhase() {
- if(incomingMsg.getMessage() == Message.FROMSELF){
- outgoingMsg.setMessage(Message.FROMSELF);
- sendMsg(getVertexId(), outgoingMsg);
- } else{
- if (getVertexValue().getState() == State.PSEUDOHEAD)
- outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
- else {
- deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- if(getVertexValue().getLengthOfMergeChain() == 0)
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- else
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- if (getVertexValue().getState() == State.PSEUDOREAR)
- outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
- else if (getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
- }
- sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
- }
+ if (getVertexValue().getState() == State.PSEUDOHEAD)
+ outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ else {
+ deleteVertex(getVertexId());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ if(getVertexValue().getLengthOfMergeChain() == 0)
+ outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ else
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ if (getVertexValue().getState() == State.PSEUDOREAR)
+ outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
+ else if (getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.STOP);
+ }
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ voteToHalt();
}
/**
* final process the result of partition phase
*/
public void finalProcessPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
- if(incomingMsg.getMessage() != Message.FROMSELF){
- while (msgIterator.hasNext()) {
- incomingMsg = msgIterator.next();
- mergeChainVertex();
- byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
- //check head or pseudoHead
- if (getVertexValue().getState() == State.START_VERTEX
- && incomingMsg.getMessage() == Message.FROMREAR) {
- getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
- //System.out.println();
- }
- }
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ mergeChainVertex();
+ byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(adjMap);
+ //check head or pseudoHead
+ if (getVertexValue().getState() == State.START_VERTEX
+ && incomingMsg.getMessage() == Message.STOP) {
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ } else if(getVertexValue().getState() == State.PSEUDOHEAD
+ && incomingMsg.getMessage() == Message.STOP)
+ getVertexValue().setState(State.END_VERTEX);
}
}
/**
@@ -379,13 +380,11 @@
}
} else if (getSuperstep() % 2 == 0 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
sendMsgToPathVertexPartitionPhase(msgIterator);
- voteToHalt();
} else if (getSuperstep() % 2 == 1 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
while (msgIterator.hasNext()) {
incomingMsg = msgIterator.next();
responseMsgToHeadVertexPartitionPhase();
}
- voteToHalt();
} else if(getSuperstep() == 3 + 2 * maxRound + 1 && getSuperstep() <= maxIteration){
finalProcessPartitionPhase(msgIterator);
} else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index 21d732e..fa5f73b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -8,8 +8,7 @@
public static final byte STOP = 3;
public static final byte FROMPSEUDOHEAD = 4;
public static final byte FROMPSEUDOREAR = 5;
- public static final byte FROMREAR = 6;
- public static final byte FROMSELF = 7;
+ public static final byte FROMSELF = 6;
public final static class MESSAGE_CONTENT {
@@ -34,9 +33,6 @@
case FROMPSEUDOREAR:
r = "FROMPSEUDOREAR";
break;
- case FROMREAR:
- r = "FROMREAR";
- break;
case FROMSELF:
r = "FROMSELF";
break;
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index 306e083..c8aaa84 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -50,7 +50,7 @@
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
+ "TreePath"};*/
- /*+ "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
+ /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",