Merge branch 'genomix/velvet_graphbuilding' into genomix/fullstack_genomix
Conflicts:
genomix/genomix-data/.classpath
genomix/genomix-data/.project
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingMapper.java
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/DeepGraphBuildingReducer.java
genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/valvetgraphbuilding/GraphInvertedIndexBuildingMapper.java
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index f849b21..4014377 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -11,6 +11,7 @@
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
@@ -41,6 +42,12 @@
@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
public String profiling = "false";
+
+ @Option(name = "-pseudo-rate", usage = "the rate of pseduHead", required = false)
+ public float pseudoRate = -1;
+
+ @Option(name = "-max-patitionround", usage = "max rounds in partition phase", required = false)
+ public int maxRound = -1;
}
public static void run(String[] args, PregelixJob job) throws Exception {
@@ -61,10 +68,18 @@
FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.KMER_SIZE, options.sizeKmer);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, options.sizeKmer);
if (options.numIteration > 0) {
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
job.getConfiguration().setInt(LogAlgorithmForPathMergeVertex.ITERATIONS, options.numIteration);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.ITERATIONS, options.numIteration);
}
+
+ if (options.pseudoRate > 0 && options.pseudoRate <= 1)
+ job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, options.pseudoRate);
+ if (options.maxRound > 0)
+ job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, options.maxRound);
return options;
+
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 68d70ad..110247e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -37,8 +37,8 @@
@Override
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex)
throws IOException, InterruptedException {
- if (vertex.getVertexValue().getState() != State.END_VERTEX
- && vertex.getVertexValue().getState() != State.MID_VERTEX) {
+ //&& vertex.getVertexValue().getState() != State.MID_VERTEX
+ if (vertex.getVertexValue().getState() != State.END_VERTEX) {
getRecordWriter().write(vertex.getVertexId(), vertex.getVertexValue());
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
index f9574a4..c5378a2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/NaiveAlgorithmMessageWritable.java
@@ -21,19 +21,21 @@
private KmerBytesWritable sourceVertexId;
private byte adjMap;
private byte lastGeneCode;
+ private VKmerBytesWritable chainVertexId;
private byte message;
private byte checkMessage;
public NaiveAlgorithmMessageWritable() {
sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(1);
adjMap = (byte) 0;
- lastGeneCode = (byte) 0;
+ lastGeneCode = (byte) -1;
message = Message.NON;
checkMessage = (byte) 0;
}
- public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, byte message) {
+ public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, VKmerBytesWritable chainVertexId, byte message) {
checkMessage = 0;
if (sourceVertexId != null) {
checkMessage |= CheckMessage.SOURCE;
@@ -47,13 +49,18 @@
checkMessage |= CheckMessage.LASTGENECODE;
this.lastGeneCode = lastGeneCode;
}
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
this.message = message;
}
public void reset() {
checkMessage = 0;
adjMap = (byte) 0;
- lastGeneCode = (byte) 0;
+ lastGeneCode = (byte) -1;
+ chainVertexId.reset(1);
message = Message.NON;
}
@@ -84,12 +91,27 @@
}
public void setLastGeneCode(byte lastGeneCode) {
- if (lastGeneCode != 0) {
+ if (lastGeneCode != -1) {
checkMessage |= CheckMessage.LASTGENECODE;
this.lastGeneCode = lastGeneCode;
}
}
+ public VKmerBytesWritable getChainVertexId() {
+ return chainVertexId;
+ }
+
+ public void setChainVertexId(VKmerBytesWritable chainVertexId) {
+ if (chainVertexId != null) {
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ }
+
+ public int getLengthOfChain() {
+ return chainVertexId.getKmerLength();
+ }
+
public byte getMessage() {
return message;
}
@@ -98,6 +120,10 @@
this.message = message;
}
+ public boolean isGeneCode(){
+ return ((checkMessage & CheckMessage.LASTGENECODE) != 0);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(checkMessage);
@@ -107,6 +133,8 @@
out.write(adjMap);
if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
out.write(lastGeneCode);
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.write(out);
out.write(message);
}
@@ -120,6 +148,8 @@
adjMap = in.readByte();
if ((checkMessage & CheckMessage.LASTGENECODE) != 0)
lastGeneCode = in.readByte();
+ if ((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.readFields(in);
message = in.readByte();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
new file mode 100644
index 0000000..44d47a0
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P3ForPathMergeVertex.java
@@ -0,0 +1,418 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.VertexUtil;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P3ForPathMergeVertex extends
+ Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable> {
+ public static final String KMER_SIZE = "P3ForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "P3ForPathMergeVertex.iteration";
+ public static final String PSEUDORATE = "P3ForPathMergeVertex.pseudoRate";
+ public static final String MAXROUND = "P3ForPathMergeVertex.maxRound";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+ public static float pseudoRate = -1;
+ public static int maxRound = -1;
+
+ private NaiveAlgorithmMessageWritable incomingMsg = new NaiveAlgorithmMessageWritable();
+ private NaiveAlgorithmMessageWritable outgoingMsg = new NaiveAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex() {
+ if (kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ if(pseudoRate < 0)
+ pseudoRate = getContext().getConfiguration().getFloat(PSEUDORATE, 0.2f);
+ if (maxRound < 0)
+ maxRound = getContext().getConfiguration().getInt(MAXROUND, 2);
+ outgoingMsg.reset();
+ }
+
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode) {
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap) {
+ VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
+ return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+ }
+
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ if ((adjMap & (1 << x)) != 0) {
+ destVertexId.set(getDestVertexId(vertexId, x));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
+
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ if (((adjMap >> 4) & (1 << x)) != 0) {
+ destVertexId.set(getPreDestVertexId(vertexId, x));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
+
+ /**
+ * start sending message
+ */
+ public void startSendMsg() {
+ if (VertexUtil.isHeadVertex(getVertexValue().getAdjMap())) {
+ outgoingMsg.setMessage(Message.START);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ if (VertexUtil.isRearVertex(getVertexValue().getAdjMap())) {
+ outgoingMsg.setMessage(Message.END);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ }
+
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ if (msgIterator.hasNext()) {
+ do {
+ if (!VertexUtil.isPathVertex(getVertexValue().getAdjMap())) {
+ msgIterator.next();
+ voteToHalt();
+ } else {
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ } while (msgIterator.hasNext());
+ } else {
+ float random = (float) Math.random();
+ if (random < pseudoRate)
+ markPseudoHead();
+ else{
+ getVertexValue().setState(State.NON_VERTEX);
+ voteToHalt();
+ }
+ /*if (getVertexId().toString().equals("CCTCA") || getVertexId().toString().equals("CTCAG")) //AGTAC CCTCA CTCAG CGCCC ACGCC
+ markPseudoHead();
+ else
+ voteToHalt();*/
+ }
+ }
+
+ /**
+ * mark the pseudoHead
+ */
+ public void markPseudoHead() {
+ getVertexValue().setState(State.PSEUDOHEAD);
+ outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ destVertexId
+ .set(getPreDestVertexId(getVertexId(),
+ GeneCode.getGeneCodeFromBitMap((byte) ((getVertexValue().getAdjMap() >> 4) & 0x0F))));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+
+ /**
+ * mark the pseudoRear
+ */
+ public void markPseudoRear() {
+ if (incomingMsg.getMessage() == Message.FROMPSEUDOHEAD
+ && getVertexValue().getState() != State.START_VERTEX) {
+ getVertexValue().setState(State.PSEUDOREAR);
+ voteToHalt();
+ }
+ else if(incomingMsg.getMessage() == Message.FROMPSEUDOHEAD
+ && getVertexValue().getState() == State.START_VERTEX){
+ getVertexValue().setState(State.START_HALT);
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState() {
+ if (incomingMsg.getMessage() == Message.START) {
+ getVertexValue().setState(State.START_VERTEX);
+ } else if (incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX) {
+ getVertexValue().setState(State.END_VERTEX);
+ voteToHalt();
+ } else
+ voteToHalt();
+ }
+
+ /**
+ * merge chain vertex
+ */
+ public void mergeChainVertex(){
+ if(incomingMsg.isGeneCode() == true){
+ getVertexValue().setMergeChain(
+ kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
+ incomingMsg.getLastGeneCode()));
+ }
+ else{
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ getVertexValue().setMergeChain(
+ kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ lastKmer));
+ }
+ }
+
+ /**
+ * head node sends message to path node
+ */
+ public void sendMsgToPathVertexMergePhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ if (getSuperstep() == 3 + 2 * maxRound + 2) {
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ sendMsg(destVertexId, outgoingMsg);
+ } else {
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ if (incomingMsg.getMessage() != Message.STOP) {
+ mergeChainVertex();
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId
+ .set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), incomingMsg.getAdjMap()));
+ sendMsg(destVertexId, outgoingMsg);
+ } else {
+ mergeChainVertex();
+ byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(adjMap);
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ }
+ }
+ }
+ }
+
+ /**
+ * path node sends message back to head node
+ */
+ public void responseMsgToHeadVertexMergePhase() {
+ deleteVertex(getVertexId());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ if(getVertexValue().getLengthOfMergeChain() == 0)
+ outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ else
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ if (getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.STOP);
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ }
+
+ /**
+ * head node sends message to path node in partition phase
+ */
+ public void sendMsgToPathVertexPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ if (getSuperstep() == 4) {
+ getVertexValue().setMergeChain(getVertexId());
+ if(getVertexValue().getState() != State.START_HALT){
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(), getVertexValue().getAdjMap()));
+ sendMsg(destVertexId, outgoingMsg);
+ voteToHalt();
+ }
+ } else {
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ //if from pseudoHead, voteToHalt(), otherwise ...
+ if (incomingMsg.getMessage() != Message.FROMPSEUDOHEAD){
+ mergeChainVertex();
+ byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(adjMap);
+ if (incomingMsg.getMessage() != Message.STOP
+ && incomingMsg.getMessage() != Message.FROMPSEUDOREAR) {
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
+ incomingMsg.getAdjMap()));
+ sendMsg(destVertexId, outgoingMsg);
+ voteToHalt();
+ } else {
+ //check head or pseudoHead
+ if (getVertexValue().getState() == State.START_VERTEX
+ && incomingMsg.getMessage() == Message.STOP) {
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ } else if(getVertexValue().getState() == State.PSEUDOHEAD
+ && incomingMsg.getMessage() == Message.STOP)
+ getVertexValue().setState(State.END_VERTEX);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * path node sends message back to head node in partition phase
+ */
+ public void responseMsgToHeadVertexPartitionPhase() {
+ if (getVertexValue().getState() == State.PSEUDOHEAD)
+ outgoingMsg.setMessage(Message.FROMPSEUDOHEAD);
+ else {
+ deleteVertex(getVertexId());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ if(getVertexValue().getLengthOfMergeChain() == 0)
+ outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ else
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ if (getVertexValue().getState() == State.PSEUDOREAR)
+ outgoingMsg.setMessage(Message.FROMPSEUDOREAR);
+ else if (getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.STOP);
+ }
+ sendMsg(incomingMsg.getSourceVertexId(), outgoingMsg);
+ voteToHalt();
+ }
+
+ /**
+ * final process the result of partition phase
+ */
+ public void finalProcessPartitionPhase(Iterator<NaiveAlgorithmMessageWritable> msgIterator){
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ mergeChainVertex();
+ byte adjMap = VertexUtil.updateRightNeighber(getVertexValue().getAdjMap(), incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(adjMap);
+ //check head or pseudoHead
+ if (getVertexValue().getState() == State.START_VERTEX
+ && incomingMsg.getMessage() == Message.STOP) {
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ } else if(getVertexValue().getState() == State.PSEUDOHEAD
+ && incomingMsg.getMessage() == Message.STOP)
+ getVertexValue().setState(State.END_VERTEX);
+ }
+ }
+ /**
+ * After partition phase, reset state: ex. psudoHead and psudoRear -> null
+ */
+ public void resetState() {
+ if (getVertexValue().getState() == State.PSEUDOHEAD || getVertexValue().getState() == State.PSEUDOREAR) {
+ getVertexValue().setState(State.NON_VERTEX);
+ }
+ if(getVertexValue().getState() == State.START_HALT)
+ getVertexValue().setState(State.START_VERTEX);
+ }
+
+ @Override
+ public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if (getSuperstep() == 2)
+ initState(msgIterator);
+ else if (getSuperstep() == 3) {
+ if (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ markPseudoRear();
+ }
+ } else if (getSuperstep() % 2 == 0 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
+ sendMsgToPathVertexPartitionPhase(msgIterator);
+ } else if (getSuperstep() % 2 == 1 && getSuperstep() <= 3 + 2 * maxRound && getSuperstep() <= maxIteration) {
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ responseMsgToHeadVertexPartitionPhase();
+ }
+ } else if(getSuperstep() == 3 + 2 * maxRound + 1 && getSuperstep() <= maxIteration){
+ finalProcessPartitionPhase(msgIterator);
+ } else if (getSuperstep() % 2 == 1 && getSuperstep() <= maxIteration) {
+ resetState();
+ if(getVertexValue().getState() == State.START_VERTEX)
+ sendMsgToPathVertexMergePhase(msgIterator);
+ voteToHalt();
+ } else if (getSuperstep() % 2 == 0 && getSuperstep() <= maxIteration) {
+ while (msgIterator.hasNext()) {
+ incomingMsg = msgIterator.next();
+ responseMsgToHeadVertexMergePhase();
+ }
+ voteToHalt();
+ } else
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(P3ForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P3ForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index 85649b3..bb288ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -2,11 +2,13 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,15 +48,15 @@
}
public static void main(String[] args) throws IOException {
- /*Path dir = new Path("data/test8m");
- Path outDir = new Path("data/input/test");
- FileUtils.cleanDirectory(new File("data/input/test"));
+ Path dir = new Path("data/split.aa");
+ Path outDir = new Path("data/input");
+ FileUtils.cleanDirectory(new File("data/input"));
Path inFile = new Path(dir, "part-0");
- Path outFile = new Path(outDir, "part-0-out-100");
- generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100);*/
- String inFile = "data/shortjump_1.head8M.fastq";
+ Path outFile = new Path(outDir, "part-0-out-1000");
+ generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 1000);
+ /* String inFile = "data/shortjump_1.head8M.fastq";
String outFile = "data/testGeneFile";
- generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);
+ generateNumOfLinesFromGraphBuildResuiltBigFile(inFile, outFile, 100000);*/
}
public static String readTextFile(String fileName, int numOfLines) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index 4332471..fa5f73b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -6,7 +6,9 @@
public static final byte START = 1;
public static final byte END = 2;
public static final byte STOP = 3;
- public static final byte PSEUDOREAR = 4;
+ public static final byte FROMPSEUDOHEAD = 4;
+ public static final byte FROMPSEUDOREAR = 5;
+ public static final byte FROMSELF = 6;
public final static class MESSAGE_CONTENT {
@@ -25,8 +27,14 @@
case STOP:
r = "STOP";
break;
- case PSEUDOREAR:
- r = "PSEUDOREAR";
+ case FROMPSEUDOHEAD:
+ r = "FROMPSEUDOHEAD";
+ break;
+ case FROMPSEUDOREAR:
+ r = "FROMPSEUDOREAR";
+ break;
+ case FROMSELF:
+ r = "FROMSELF";
break;
}
return r;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index c1f4696..6730299 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -5,7 +5,7 @@
public static final byte NON_VERTEX = 0;
public static final byte START_VERTEX = 1;
public static final byte END_VERTEX = 2;
- public static final byte MID_VERTEX = 3;
+ public static final byte START_HALT = 3;
public static final byte PSEUDOHEAD = 4;
public static final byte PSEUDOREAR = 5;
public static final byte FINAL_VERTEX = 6;
@@ -25,8 +25,8 @@
case END_VERTEX:
r = "END_VERTEX";
break;
- case MID_VERTEX:
- r = "MID_VERTEX";
+ case START_HALT:
+ r = "START_HALT";
break;
case PSEUDOHEAD:
r = "PSEUDOHEAD";
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 8b138f2..809ea34 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -11,6 +11,7 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.P3ForPathMergeVertex;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -50,10 +51,30 @@
private static void genLogAlgorithmForMergeGraph() throws IOException {
generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
}
+
+ private static void generateP3ForMergeGraphJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(P3ForPathMergeVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setFloat(P3ForPathMergeVertex.PSEUDORATE, 0.4f);
+ job.getConfiguration().setInt(P3ForPathMergeVertex.MAXROUND, 1);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genP3ForMergeGraph() throws IOException {
+ generateP3ForMergeGraphJob("P3ForMergeGraph", outputBase
+ + "P3ForMergeGraph.xml");
+ }
public static void main(String[] args) throws IOException {
- genNaiveAlgorithmForMergeGraph();
- genLogAlgorithmForMergeGraph();
+ //genNaiveAlgorithmForMergeGraph();
+ //genLogAlgorithmForMergeGraph();
+ genP3ForMergeGraph();
}
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
index dcbbb79..c8aaa84 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/PathMergeSmallTestSuite.java
@@ -45,19 +45,18 @@
public static final String PreFix = "data/PathTestSet"; //"graphbuildresult";
public static final String[] TestDir = { PreFix + File.separator
- //+ "split.aa"};
- //+ "split.aa"};/*, PreFix + File.separator
+ + "LongPath"};/*, PreFix + File.separator
/*+ "CyclePath"};, PreFix + File.separator
+ "SimplePath", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
+ "TreePath"};*/
- + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
+ /* + "2", PreFix + File.separator + "3", PreFix + File.separator + "4", PreFix + File.separator + "5",
PreFix + File.separator + "6", PreFix + File.separator + "7", PreFix + File.separator + "8",
PreFix + File.separator + "9", PreFix + File.separator + "TwoKmer", PreFix + File.separator + "ThreeKmer",
PreFix + File.separator + "SinglePath", PreFix + File.separator + "SimplePath",
PreFix + File.separator + "Path", PreFix + File.separator + "BridgePath",
PreFix + File.separator + "CyclePath", PreFix + File.separator + "RingPath",
- PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };
+ PreFix + File.separator + "LongPath", PreFix + File.separator + "TreePath" };*/
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";