add incomingMsg and outgoingMsg and do some optimization after code review
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
index aaa7e53..322af31 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/client/Client.java
@@ -10,7 +10,6 @@
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import edu.uci.ics.genomix.pregelix.operator.NaiveFilterVertex;
import edu.uci.ics.genomix.pregelix.operator.ThreeStepLogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index 69352a1..a002179 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -6,7 +6,7 @@
import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.pregelix.operator.ThreeStepLogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -26,8 +26,8 @@
private byte checkMessage;
public LogAlgorithmMessageWritable(){
- sourceVertexId = new VKmerBytesWritable(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
+ sourceVertexId = new VKmerBytesWritable(TwoStepLogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(TwoStepLogAlgorithmForPathMergeVertex.kmerSize);
adjMap = 0;
message = 0;
checkMessage = 0;
@@ -52,7 +52,7 @@
public void reset(){
checkMessage = 0;
- chainVertexId.reset(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(TwoStepLogAlgorithmForPathMergeVertex.kmerSize);
adjMap = (byte)0;
message = 0;
}
@@ -73,7 +73,10 @@
}
public void setAdjMap(byte adjMap) {
- this.adjMap = adjMap;
+ if(adjMap != 0){
+ checkMessage |= CheckMessage.ADJMAP;
+ this.adjMap = adjMap;
+ }
}
public VKmerBytesWritable getChainVertexId() {
@@ -81,7 +84,10 @@
}
public void setChainVertexId(VKmerBytesWritable chainVertexId) {
- this.chainVertexId.set(chainVertexId);
+ if(chainVertexId != null){
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
}
public byte getMessage() {
@@ -110,6 +116,7 @@
@Override
public void readFields(DataInput in) throws IOException {
+ this.reset();
checkMessage = in.readByte();
if((checkMessage & CheckMessage.SOURCE) != 0)
sourceVertexId.readFields(in);
@@ -127,7 +134,7 @@
@Override
public boolean equals(Object o) {
- if (o instanceof NaiveAlgorithmMessageWritable) {
+ if (o instanceof LogAlgorithmMessageWritable) {
LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
return chainVertexId.equals(tp.chainVertexId);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/P1MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/P1MessageWritable.java
new file mode 100644
index 0000000..a86ffe0
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/P1MessageWritable.java
@@ -0,0 +1,148 @@
+package edu.uci.ics.genomix.pregelix.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+
+public class P1MessageWritable implements WritableComparable<P1MessageWritable>{
+ /**
+ * sourceVertexId stores source vertexId when headVertex sends the message
+ * stores neighber vertexValue when pathVertex sends the message
+ * file stores the point to the file that stores the chains of connected DNA
+ */
+ private KmerBytesWritable sourceVertexId;
+ private byte adjMap;
+ private byte lastGeneCode;
+ private byte message;
+
+ private byte checkMessage;
+
+ public P1MessageWritable(){
+ sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ adjMap = (byte)0;
+ lastGeneCode = (byte)0;
+ message = Message.NON;
+ checkMessage = (byte)0;
+ }
+
+ public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, byte message){
+ checkMessage = 0;
+ if(sourceVertexId != null){
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId);
+ }
+ if(adjMap != 0){
+ checkMessage |= CheckMessage.ADJMAP;
+ this.adjMap = adjMap;
+ }
+ if(lastGeneCode != 0){
+ checkMessage |= CheckMessage.LASTGENECODE;
+ this.lastGeneCode = lastGeneCode;
+ }
+ this.message = message;
+ }
+
+ public void reset(){
+ checkMessage = 0;
+ sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ adjMap = (byte)0;
+ lastGeneCode = (byte)0;
+ message = Message.NON;
+ }
+
+ public KmerBytesWritable getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
+ if(sourceVertexId != null){
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId);
+ }
+ }
+
+ public byte getAdjMap() {
+ return adjMap;
+ }
+
+ public void setAdjMap(byte adjMap) {
+ if(adjMap != 0){
+ checkMessage |= CheckMessage.ADJMAP;
+ this.adjMap = adjMap;
+ }
+ }
+
+ public byte getLastGeneCode() {
+ return lastGeneCode;
+ }
+
+ public void setLastGeneCode(byte lastGeneCode) {
+ if(lastGeneCode != 0){
+ checkMessage |= CheckMessage.LASTGENECODE;
+ this.lastGeneCode = lastGeneCode;
+ }
+ }
+
+ public byte getMessage() {
+ return message;
+ }
+
+ public void setMessage(byte message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(checkMessage);
+ if((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if((checkMessage & CheckMessage.ADJMAP) != 0)
+ out.write(adjMap);
+ if((checkMessage & CheckMessage.LASTGENECODE) != 0)
+ out.write(lastGeneCode);
+ out.write(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.reset();
+ checkMessage = in.readByte();
+ if((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if((checkMessage & CheckMessage.ADJMAP) != 0)
+ adjMap = in.readByte();
+ if((checkMessage & CheckMessage.LASTGENECODE) != 0)
+ lastGeneCode = in.readByte();
+ message = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceVertexId.hashCode();
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof P1MessageWritable) {
+ P1MessageWritable tp = (P1MessageWritable) o;
+ return sourceVertexId.equals( tp.sourceVertexId);
+ }
+ return false;
+ }
+ @Override
+ public String toString() {
+ return sourceVertexId.toString();
+ }
+
+ @Override
+ public int compareTo(P1MessageWritable tp) {
+ return sourceVertexId.compareTo(tp.sourceVertexId);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
index f0020bc..12c12af 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
@@ -151,8 +151,8 @@
byte tmp = GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getAdjMap(), msg.getSourceVertexId(), kmerSize);
getVertexValue().set(tmp, State.FINAL_VERTEX, msg.getChainVertexId());
setVertexValue(getVertexValue());
- String source = msg.getChainVertexId().toString();
- System.out.print("");
+ //String source = msg.getChainVertexId().toString();
+ //System.out.print("");
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P1ForPathMergeVertex.java
new file mode 100644
index 0000000..ca18c75
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P1ForPathMergeVertex.java
@@ -0,0 +1,232 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.P1MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class P1ForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, P1MessageWritable>{
+ public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private P1MessageWritable incomingMsg = new P1MessageWritable();
+ private P1MessageWritable outgoingMsg = new P1MessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ outgoingMsg.reset();
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+ VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
+ return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getDestVertexId(vertexId, x));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ destVertexId.set(getPreDestVertexId(vertexId, x));
+ sendMsg(destVertexId, outgoingMsg);
+ }
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.START);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ }
+ if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.END);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<P1MessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(incomingMsg.getMessage() == Message.START){
+ getVertexValue().setState(State.START_VERTEX);
+ }
+ else if(incomingMsg.getMessage() == Message.END
+ && getVertexValue().getState() != State.START_VERTEX){
+ getVertexValue().setState(State.END_VERTEX);
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
+ * head node sends message to path node
+ */
+ public void sendMsgToPathVertex(Iterator<P1MessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ getVertexValue().setMergeChain(getVertexId());
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
+ getVertexValue().getAdjMap()));
+ sendMsg(destVertexId,outgoingMsg);
+ }else{
+ while (msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ if(incomingMsg.getMessage() != Message.STOP){
+ getVertexValue().setMergeChain(kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
+ incomingMsg.getLastGeneCode()));
+ outgoingMsg.setSourceVertexId(getVertexId());
+ destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
+ incomingMsg.getAdjMap()));
+ sendMsg(destVertexId,outgoingMsg);
+ }
+ else{
+ getVertexValue().setMergeChain(kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
+ incomingMsg.getLastGeneCode()));
+ byte adjMap = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(),
+ incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(adjMap);
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ }
+ }
+ }
+ }
+ /**
+ * path node sends message back to head node
+ */
+ public void responseMsgToHeadVertex(){
+ deleteVertex(getVertexId());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ if(getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.STOP);
+ sendMsg(incomingMsg.getSourceVertexId(),outgoingMsg);
+ }
+
+ @Override
+ public void compute(Iterator<P1MessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1) {
+ startSendMsg();
+ voteToHalt();
+ }
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ voteToHalt();
+ }
+ else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
+ while(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ responseMsgToHeadVertex();
+ }
+ voteToHalt();
+ }
+ }
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P1ForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput
+ */
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P2ForPathMergeVertex.java
new file mode 100644
index 0000000..7d5263d
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P2ForPathMergeVertex.java
@@ -0,0 +1,274 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ValueStateWritable
+ * edgeValue: NullWritable
+ * message: LogAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class P2ForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+ public static final String KMER_SIZE = "TwoStepLogAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "TwoStepLogAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private LogAlgorithmMessageWritable incomingMsg = new LogAlgorithmMessageWritable();
+ private LogAlgorithmMessageWritable outgoingMsg = new LogAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ outgoingMsg.reset();
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
+ return getDestVertexIdFromChain(chainVertexId, adjMap);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
+ VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
+ return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ sendMsg(getNextDestVertexId(vertexId, x), outgoingMsg);
+ }
+ }
+ }
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ sendMsg(getPreDestVertexId(vertexId, x), outgoingMsg);
+ }
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.START);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ outgoingMsg.setMessage(Message.END);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ incomingMsg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(incomingMsg.getMessage() == Message.START){
+ getVertexValue().setState(State.START_VERTEX);
+ getVertexValue().setMergeChain(null);
+ }
+ else if(incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX){
+ getVertexValue().setState(State.END_VERTEX);
+ getVertexValue().setMergeChain(getVertexId());
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
+ * head send message to path
+ */
+ public void sendOutMsg(KmerBytesWritable chainVertexId, byte adjMap){
+ if(getVertexValue().getState() == State.START_VERTEX){
+ outgoingMsg.setMessage(Message.START);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ }
+ else if(getVertexValue().getState() != State.END_VERTEX){
+ outgoingMsg.setMessage(Message.NON);
+ outgoingMsg.setSourceVertexId(getVertexId());
+ sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
+ }
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ getVertexValue().setMergeChain(getVertexId());
+ sendOutMsg(getVertexId(), getVertexValue().getAdjMap());
+ }
+ else{
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ if(mergeChainVertex(msgIterator)){
+ if(incomingMsg.getMessage() == Message.END){
+ if(getVertexValue().getState() == State.START_VERTEX){
+ getVertexValue().setState(State.FINAL_VERTEX);
+ //String source = getVertexValue().getMergeChain().toString();
+ //System.out.println();
+ }
+ else
+ getVertexValue().setState(State.END_VERTEX);
+ }
+ else
+ sendOutMsg(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ }
+ }
+ }
+ }
+ /**
+ * path response message to head
+ */
+ public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ incomingMsg = msgIterator.next();
+ outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
+ outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
+ if(getVertexValue().getState() == State.END_VERTEX)
+ outgoingMsg.setMessage(Message.END);
+ sendMsg(incomingMsg.getSourceVertexId(),outgoingMsg);
+
+ if(incomingMsg.getMessage() == Message.START)
+ deleteVertex(getVertexId());
+ }
+ else{
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX)
+ deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ }
+ }
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ //merge chain
+ lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
+ incomingMsg.getChainVertexId()));
+ chainVertexId.set(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
+ lastKmer));
+ if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
+ getVertexValue().setMergeChain(null);
+ getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
+ chainVertexId.getGeneCodeAtPosition(kmerSize)));
+ getVertexValue().setState(State.CYCLE);
+ return false;
+ }
+ else
+ getVertexValue().setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(),
+ incomingMsg.getAdjMap());
+ getVertexValue().setAdjMap(tmpVertexValue);
+ return true;
+ }
+ @Override
+ public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ voteToHalt();
+ }
+ else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
+ responseMsgToHeadVertex(msgIterator);
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(TwoStepLogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(P2ForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
index 7564d49..6b67333 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
@@ -19,7 +19,6 @@
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
/*
* vertexId: BytesWritable
* vertexValue: ValueStateWritable
@@ -112,7 +111,6 @@
}
}
}
-
/**
* set vertex state
*/
@@ -154,26 +152,28 @@
* send non message to next node
*/
public void sendNonMsgToNextNode(){
+ msg.reset();
msg.setMessage(Message.NON);
msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
+ voteToHalt();
}
/**
* head send message to path
*/
public void sendMsgToPathVertex(KmerBytesWritable chainVertexId, byte adjMap){
- if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1
- || getVertexValue().getState() == State.FINAL_VERTEX) //|| lastKmer == null
- voteToHalt();
- else{
+ //if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1
+ // || getVertexValue().getState() == State.FINAL_VERTEX) //|| lastKmer == null
+ // voteToHalt();
+ //else{
destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
if(getVertexValue().getState() == State.START_VERTEX){
sendStartMsgToNextNode();
}
else if(getVertexValue().getState() != State.END_VERTEX){ //FINAL_DELETE
- sendEndMsgToNextNode();
+ sendNonMsgToNextNode();//sendEndMsgToNextNode();
}
- }
+ //}
}
/**
* path send message to head
@@ -254,17 +254,19 @@
if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
msg.set(null, null, (byte)0, Message.START);
sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
+ //voteToHalt();
}
if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
msg.set(null, null, (byte)0, Message.END);
sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
+ //voteToHalt();
}
if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
getVertexValue().setState(State.MID_VERTEX);
setVertexValue(getVertexValue());
}
+ else
+ voteToHalt();
}
/**
* initiate head, rear and path node
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index d9fd35f..6ae2e30 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -2,13 +2,11 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
index 9cb798e..45741f7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/CheckMessage.java
@@ -7,6 +7,7 @@
public static final byte ADJMAP = 1 << 2;
public static final byte MESSAGE = 1 << 3;
public static final byte STATE = 1 << 4;
+ public static final byte LASTGENECODE = 1 << 5;
public final static class CheckMessage_CONTENT{
@@ -28,6 +29,9 @@
case STATE:
r = "STATE";
break;
+ case LASTGENECODE:
+ r = "LASTGENECODE";
+ break;
}
return r;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index 9e82cc9..b1a9517 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -5,6 +5,7 @@
public static final byte NON = 0;
public static final byte START = 1;
public static final byte END = 2;
+ public static final byte STOP = 3;
public final static class MESSAGE_CONTENT{
@@ -20,6 +21,9 @@
case END:
r = "END";
break;
+ case STOP:
+ r = "STOP";
+ break;
}
return r;
}