add test to filter specific path
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 8075238..39e1e41 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -34,13 +34,14 @@
public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
-
@Override
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
if(vertex.getVertexValue().getState() != State.FINAL_DELETE
- && vertex.getVertexValue().getState() != State.END_VERTEX)
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ && vertex.getVertexValue().getState() != State.END_VERTEX){
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ }
+
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index eea7c1c..1812b57 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -14,6 +14,8 @@
public class NaiveAlgorithmForPathMergeOutputFormat extends
BinaryVertexOutputFormat<KmerBytesWritable, ValueStateWritable, NullWritable> {
+
+
@Override
public VertexWriter<KmerBytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
@@ -31,12 +33,11 @@
public BinaryLoadGraphVertexWriter(RecordWriter<KmerBytesWritable, ValueStateWritable> lineRecordWriter) {
super(lineRecordWriter);
}
-
@Override
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
- //if(vertex.getVertexValue().getLengthOfMergeChain() != NaiveAlgorithmForPathMergeVertex.kmerSize)
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ //if(vertex.getVertexValue().isOp() == true)
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
index 298864e..237cca7 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/graph/Graph.java
@@ -50,6 +50,10 @@
public static void main(String[] args) throws Exception
{
Graph g = new Graph();
- g.start("result.txt.txt");
+ g.start("BridgePath_7");
+ g.start("CyclePath_7");
+ g.start("SimplePath_7");
+ g.start("SinglePath_7");
+ g.start("TreePath_7");
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index a1a15a6..fc833f8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -15,10 +15,14 @@
private byte adjMap;
private byte state;
private VKmerBytesWritable mergeChain;
+
+ //extra - for test
+ //private boolean isOp;
public ValueStateWritable() {
state = State.NON_VERTEX;
mergeChain = new VKmerBytesWritable(0);
+ //isOp = false;
}
public ValueStateWritable(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
@@ -65,11 +69,20 @@
this.mergeChain.set(mergeChain);
}
+ /*public boolean isOp() {
+ return isOp;
+ }
+
+ public void setOp(boolean isOp) {
+ this.isOp = isOp;
+ }*/
+
@Override
public void readFields(DataInput in) throws IOException {
adjMap = in.readByte();
state = in.readByte();
mergeChain.readFields(in);
+ //isOp = in.readBoolean();
}
@Override
@@ -77,6 +90,7 @@
out.writeByte(adjMap);
out.writeByte(state);
mergeChain.write(out);
+ //out.writeBoolean(isOp);
}
@Override
@@ -86,8 +100,6 @@
@Override
public String toString() {
- //if(mergeChain.getKmerLength() == -1 || mergeChain.getKmerLength() == 0)
- // return GeneCode.getSymbolFromBitMap(adjMap);
return GeneCode.getSymbolFromBitMap(adjMap) + "\t" +
getLengthOfMergeChain() + "\t" +
mergeChain.toString();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
new file mode 100644
index 0000000..2dbc180
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
@@ -0,0 +1,347 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.Message;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ValueStateWritable
+ * edgeValue: NullWritable
+ * message: LogAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+public class LogFilterVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
+ public static final String KMER_SIZE = "TwoStepLogAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "TwoStepLogAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
+ return getDestVertexIdFromChain(chainVertexId, adjMap);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getNextDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+ /**
+ * head send message to all previous nodes
+ */
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
+ destVertexId.set(getPreDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+
+ /**
+ * set vertex state
+ */
+ public void setState(){
+ if(msg.getMessage() == Message.START &&
+ (getVertexValue().getState() == State.MID_VERTEX || getVertexValue().getState() == State.END_VERTEX)){
+ getVertexValue().setState(State.START_VERTEX);
+ setVertexValue(getVertexValue());
+ }
+ else if(msg.getMessage() == Message.END && getVertexValue().getState() == State.MID_VERTEX){
+ getVertexValue().setState(State.END_VERTEX);
+ setVertexValue(getVertexValue());
+ voteToHalt();
+ }
+ else
+ voteToHalt();
+ }
+ /**
+ * send start message to next node
+ */
+ public void sendStartMsgToNextNode(){
+ msg.reset();
+ msg.setMessage(Message.START);
+ msg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, msg);
+ voteToHalt();
+ }
+ /**
+ * send end message to next node
+ */
+ public void sendEndMsgToNextNode(){
+ msg.reset();
+ msg.setMessage(Message.END);
+ msg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, msg);
+ voteToHalt();
+ }
+ /**
+ * send non message to next node
+ */
+ public void sendNonMsgToNextNode(){
+ msg.setMessage(Message.NON);
+ msg.setSourceVertexId(getVertexId());
+ sendMsg(destVertexId, msg);
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(KmerBytesWritable chainVertexId, byte adjMap){
+ if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1
+ || getVertexValue().getState() == State.FINAL_VERTEX) //|| lastKmer == null
+ voteToHalt();
+ else{
+ destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
+ if(getVertexValue().getState() == State.START_VERTEX){
+ sendStartMsgToNextNode();
+ }
+ else if(getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ sendEndMsgToNextNode();
+ }
+ }
+ }
+ /**
+ * path send message to head
+ */
+ public void responseMsgToHeadVertex(){
+ if(getVertexValue().getLengthOfMergeChain() == 0){
+ getVertexValue().setMergeChain(getVertexId());
+ setVertexValue(getVertexValue());
+ }
+ destVertexId.set(msg.getSourceVertexId());
+ msg.set(null, getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage());
+ setMessageType(msg.getMessage());
+ sendMsg(destVertexId,msg);
+ }
+ /**
+ * set message type
+ */
+ public void setMessageType(int message){
+ //kill Message because it has been merged by the head
+ if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
+ msg.setMessage(Message.END);
+ getVertexValue().setState(State.FINAL_DELETE);
+ setVertexValue(getVertexValue());
+ }
+ else
+ msg.setMessage(Message.NON);
+
+ if(message == Message.START){
+ deleteVertex(getVertexId());
+ }
+ }
+ /**
+ * set vertexValue's state chainVertexId, value
+ */
+ public boolean setVertexValueAttributes(){
+ if(msg.getMessage() == Message.END){
+ if(getVertexValue().getState() != State.START_VERTEX)
+ getVertexValue().setState(State.END_VERTEX);
+ else
+ getVertexValue().setState(State.FINAL_VERTEX);
+ }
+
+ if(getSuperstep() == 5)
+ chainVertexId.set(getVertexId());
+ else
+ chainVertexId.set(getVertexValue().getMergeChain());
+ lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
+ chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
+ if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
+ getVertexValue().setMergeChain(null);
+ getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
+ chainVertexId.getGeneCodeAtPosition(kmerSize)));
+ getVertexValue().setState(State.CYCLE);
+ return false;
+ }
+ else
+ getVertexValue().setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
+ getVertexValue().setAdjMap(tmpVertexValue);
+ return true;
+ }
+ /**
+ * send message to self
+ */
+ public void sendMsgToSelf(){
+ if(msg.getMessage() != Message.END){
+ setVertexValue(getVertexValue());
+ msg.reset(); //reset
+ msg.setAdjMap(getVertexValue().getAdjMap());
+ sendMsg(getVertexId(),msg);
+ }
+ }
+ /**
+ * start sending message
+ */
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ msg.set(null, null, (byte)0, Message.START);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ msg.set(null, null, (byte)0, Message.END);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
+ voteToHalt();
+ }
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ getVertexValue().setState(State.MID_VERTEX);
+ setVertexValue(getVertexValue());
+ }
+ }
+ /**
+ * initiate head, rear and path node
+ */
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ msgIterator.next();
+ voteToHalt();
+ }
+ else{
+ msg = msgIterator.next();
+ setState();
+ }
+ }
+ }
+ /**
+ * head send message to path
+ */
+ public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
+ sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
+ }
+ else{
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ if(mergeChainVertex(msgIterator))
+ sendMsgToPathVertex(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
+ else
+ voteToHalt();
+ }
+ if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
+ voteToHalt();
+ }
+ if(getVertexValue().getState() == State.FINAL_VERTEX){
+ //String source = getVertexValue().getMergeChain().toString();
+ voteToHalt();
+ }
+ }
+ }
+ /**
+ * path response message to head
+ */
+ public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
+ }
+ else{
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ }
+ }
+ }
+ /**
+ * merge chainVertex and store in vertexVal.chainVertexId
+ */
+ public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ return setVertexValueAttributes();
+ }
+
+ @Override
+ public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1){
+ if(getVertexId().toString().equals("AAGAC")
+ || getVertexId().toString().equals("AGCAC")){
+ startSendMsg();
+ }
+ }
+ else if(getSuperstep() == 2){
+ initState(msgIterator);
+ if(getVertexValue().getState() == State.NON_VERTEX)
+ voteToHalt();
+ }
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ }
+ else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
+ responseMsgToHeadVertex(msgIterator);
+ }
+ else
+ voteToHalt();
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index 5187f1c..ea89b06 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -156,10 +156,6 @@
@Override
public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
initVertex();
- /*if(getVertexId().toString().equals("TCCTACGAATTAATGCTCTCCCACGCAACATCACCATATCTTCACGGGAGAGCCG"))
- System.out.println();
- if(getVertexId().toString().equals("ATCCTACGAATTAATGCTCTCCCACGCAACATCACCATATCTTCACGGGAGAGCC"))
- System.out.println();*/
if (getSuperstep() == 1) {
if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
new file mode 100644
index 0000000..6efd961
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
@@ -0,0 +1,196 @@
+package edu.uci.ics.genomix.pregelix.operator;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.type.State;
+import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: NaiveAlgorithmMessageWritable
+ *
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ *
+ * succeed node
+ * A 00000001 1
+ * G 00000010 2
+ * C 00000100 4
+ * T 00001000 8
+ * precursor node
+ * A 00010000 16
+ * G 00100000 32
+ * C 01000000 64
+ * T 10000000 128
+ *
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ */
+/**
+ * Naive Algorithm for path merge graph
+ */
+public class NaiveFilterVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
+
+ public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
+ public static int kmerSize = -1;
+ private int maxIteration = -1;
+
+ private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
+ private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+
+ /**
+ * initiate kmerSize, maxIteration
+ */
+ public void initVertex(){
+ if(kmerSize == -1)
+ kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
+ }
+ public void findDestination(){
+ destVertexId.set(msg.getSourceVertexId());
+ }
+ /**
+ * get destination vertex
+ */
+ public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode){
+ return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
+ }
+
+ public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
+ /**
+ * head send message to all next nodes
+ */
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
+ destVertexId.set(getDestVertexId(vertexId, x));
+ sendMsg(destVertexId, msg);
+ }
+ }
+ }
+ /**
+ * initiate chain vertex
+ */
+ public void initChainVertex(){
+ if(!msg.isRear()){
+ findDestination();
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ chainVertexId.set(getVertexId());
+ msg.set(getVertexId(), chainVertexId, getVertexId(), getVertexValue().getAdjMap(), false);
+ sendMsg(destVertexId,msg);
+ }else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap()))
+ voteToHalt();
+ }
+ }
+ /**
+ * head node sends message to path node
+ */
+ public void sendMsgToPathVertex(){
+ if(!msg.isRear()){
+ destVertexId.set(getDestVertexIdFromChain(msg.getChainVertexId(), msg.getAdjMap()));
+ msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ }else{
+ destVertexId.set(msg.getHeadVertexId());
+ msg.set(msg.getSourceVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
+ }
+ sendMsg(destVertexId,msg);
+ }
+ /**
+ * path node sends message back to head node
+ */
+ public void responseMsgToHeadVertex(){
+ if(!msg.isRear()){
+ findDestination();
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
+ chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
+ getVertexId().getGeneCodeAtPosition(kmerSize - 1));
+ deleteVertex(getVertexId());
+ msg.set(getVertexId(), chainVertexId, msg.getHeadVertexId(), getVertexValue().getAdjMap(), false);
+ sendMsg(destVertexId,msg);
+ }
+ else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
+ sendMsg(destVertexId,msg);
+ }
+ }else{// is Rear
+ if(msg.getLengthOfChain() > kmerSize){
+ byte tmp = GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getAdjMap(), msg.getSourceVertexId(), kmerSize);
+ getVertexValue().set(tmp, State.FINAL_VERTEX, msg.getChainVertexId());
+ setVertexValue(getVertexValue());
+ //String source = msg.getChainVertexId().toString();
+ //System.out.print("");
+ }
+ }
+ }
+
+ @Override
+ public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
+ initVertex();
+ if (getSuperstep() == 1) {
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ if(getVertexId().toString().equals("AAGAC")){
+ //getVertexValue().setOp(true);
+ //setVertexValue(getVertexValue());
+ msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
+ }
+ else
+ voteToHalt();
+ }
+ }
+ else if(getSuperstep() == 2){
+ if(msgIterator.hasNext()){
+ //getVertexValue().setOp(true);
+ //setVertexValue(getVertexValue());
+ msg = msgIterator.next();
+ initChainVertex();
+
+ }
+ }
+ //head node sends message to path node
+ else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
+ while (msgIterator.hasNext()){
+ //getVertexValue().setOp(true);
+ //setVertexValue(getVertexValue());
+ msg = msgIterator.next();
+ sendMsgToPathVertex();
+ }
+ }
+ //path node sends message back to head node
+ else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
+ while(msgIterator.hasNext()){
+ //getVertexValue().setOp(true);
+ //setVertexValue(getVertexValue());
+ msg = msgIterator.next();
+ responseMsgToHeadVertex();
+ }
+ }
+ voteToHalt();
+ }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
index 712b614..c5788ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
@@ -327,10 +327,6 @@
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
initVertex();
- /*if(getVertexId().toString().equals("TCCTACGAATTAATGCTCTCCCACGCAACATCACCATATCTTCACGGGAGAGCCG"))
- System.out.println();
- if(getVertexId().toString().equals("ATCCTACGAATTAATGCTCTCCCACGCAACATCACCATATCTTCACGGGAGAGCC"))
- System.out.println();*/
if (getSuperstep() == 1)
startSendMsg();
else if(getSuperstep() == 2)
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java
index 79538b8..0709249 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/testcase/GenerateTestInput.java
@@ -55,20 +55,20 @@
// TODO Auto-generated method stub
OutputStreamWriter writer;
try {
- writer = new OutputStreamWriter(new FileOutputStream("graph/55/SinglePath_55"));
- writer.write(simplePath(55,60,1));
+ writer = new OutputStreamWriter(new FileOutputStream("graph/7/SinglePath"));
+ writer.write(simplePath(7,10,1));
writer.close();
- writer = new OutputStreamWriter(new FileOutputStream("graph/55/SimplePath_55"));
- writer.write(simplePath(55,60,3));
+ writer = new OutputStreamWriter(new FileOutputStream("graph/7/SimplePath"));
+ writer.write(simplePath(7,10,3));
writer.close();
- writer = new OutputStreamWriter(new FileOutputStream("graph/55/TreePath_55"));
- writer.write(treePath(55, 5, 5, 3));
+ writer = new OutputStreamWriter(new FileOutputStream("graph/7/TreePath"));
+ writer.write(treePath(7, 7, 7, 7));
writer.close();
- writer = new OutputStreamWriter(new FileOutputStream("graph/55/CyclePath_55"));
- writer.write(cyclePath(55,60));
+ writer = new OutputStreamWriter(new FileOutputStream("graph/7/CyclePath"));
+ writer.write(cyclePath(7,10));
writer.close();
- writer = new OutputStreamWriter(new FileOutputStream("graph/55/BridgePath_55"));
- writer.write(bridgePath(55,2));
+ writer = new OutputStreamWriter(new FileOutputStream("graph/7/BridgePath"));
+ writer.write(bridgePath(7,2));
writer.close();
} catch (IOException e) {
e.printStackTrace();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
new file mode 100644
index 0000000..a4c17b2
--- /dev/null
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.genomix.pregelix.JobGen;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
+import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
+import edu.uci.ics.genomix.pregelix.operator.LogFilterVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.operator.NaiveFilterVertex;
+import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+
+public class FilterJobGenerator {
+
+ public static String outputBase = "src/test/resources/jobs/";
+
+ private static void generateNaiveFilterJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(NaiveFilterVertex.class);
+ job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genNaiveFilter() throws IOException {
+ generateNaiveFilterJob("NaiveFilterVertex", outputBase + "NaiveFilterVertex.xml");
+ }
+
+ private static void generateLogFilterJob(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(LogFilterVertex.class);
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setDynamicVertexValueSize(true);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genLogFilter() throws IOException {
+ generateLogFilterJob("LogFilterVertex", outputBase + "LogFilterVertex.xml");
+ }
+
+ /**
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ genNaiveFilter();
+ genLogFilter();
+ }
+
+}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 68e29ee..78cc005 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -27,7 +27,7 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 7);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -43,7 +43,7 @@
job.setDynamicVertexValueSize(true);
job.setOutputKeyClass(KmerBytesWritable.class);
job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 7);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
index d79bb91..af181d9 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/GraphBuildTest.java
@@ -34,13 +34,14 @@
private static final String ACTUAL_RESULT_DIR = "graphbuildresult";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_PATH = "data/TreePath_55";
+ private static final String DATA_PATH = "graph/7/TreePath";
private static final String HDFS_INPUT_PATH = "/test";
private static final String HDFS_OUTPUT_PATH = "/result";
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
+ HDFS_OUTPUT_PATH + "/result.txt";
- private static final String CONVERT_RESULT = "graph/result.txt.txt";
+ private static final String CONVERT_RESULT = ACTUAL_RESULT_DIR
+ + HDFS_OUTPUT_PATH + "/result.txt.txt";
private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
@@ -64,7 +65,7 @@
FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
- conf.setInt(GenomixJob.KMER_LENGTH, 55);
+ conf.setInt(GenomixJob.KMER_LENGTH, 7);
driver = new Driver(
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
@@ -114,24 +115,17 @@
@Test
public void TestAll() throws Exception {
cleanUpReEntry();
- TestHybridGroupby();
- cleanUpReEntry();
TestPreClusterGroupby();
}
public void TestPreClusterGroupby() throws Exception {
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
+ //conf.set(GenomixJob.OUTPUT_FORMAT, "text");
System.err.println("Testing PreClusterGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_PATH));
}
- public void TestHybridGroupby() throws Exception {
- conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- System.err.println("Testing HybridGroupBy");
- driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
- Assert.assertEquals(true, checkResults(EXPECTED_PATH));
- }
private boolean checkResults(String expectedPath) throws Exception {
File dumped = null;
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
index 217f59a..5e99cc9 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/MergePathTest.java
@@ -58,13 +58,18 @@
if (key == null || value == null){
break;
}
- /*bw.write(value.getLengthOfMergeChain() + "\t" +
- value.getMergeChain().toString() + "\t" +
- GeneCode.getSymbolFromBitMap(value.getAdjMap()) + "\t" +
- key.toString());*/
- bw.write(key.toString() + "\t" +
- value.toString());
- bw.newLine();
+ if(value.getLengthOfMergeChain() != 0
+ && value.getLengthOfMergeChain() != -1){
+ //&& value.getState() == State.FINAL_VERTEX){
+ //bw.write(key.toString() + "\t" +
+ // value.toString());
+ bw.write(value.getLengthOfMergeChain() + "\t" +
+ value.getMergeChain().toString() + "\t" +
+ GeneCode.getSymbolFromBitMap(value.getAdjMap()) + "\t" +
+ value.getState());
+ //+ "\t" + key.toString());
+ bw.newLine();
+ }
}
reader.close();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
index 4b6d367..52fd4c7 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
@@ -77,7 +77,7 @@
private void compareResults() throws Exception {
dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(
resultFileDir));
- GenerateTextFile.generateFromPathmergeResult(55, resultFileDir, textFileDir);
+ GenerateTextFile.generateFromPathmergeResult(7, resultFileDir, textFileDir);
// TestUtils.compareWithResultDir(new File(expectedFileDir), new
// File(resultFileDir));
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
index eee9e39..901706e 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
@@ -44,10 +44,23 @@
private static final Logger LOGGER = Logger
.getLogger(PathMergeSmallTestSuite.class.getName());
- public static final String PreFix = "data/input";
+ public static final String PreFix = "data/7";
public static final String[] TestDir = { PreFix + File.separator
- + "test"};
- /*+ "TwoKmer", PreFix + File.separator
+ + "BridgePath", PreFix + File.separator
+ + "CyclePath", PreFix + File.separator
+ + "SimplePath", PreFix + File.separator
+ + "SinglePath", PreFix + File.separator
+ + "TreePath"};
+ /*+ "2", PreFix + File.separator
+ + "3", PreFix + File.separator
+ + "4", PreFix + File.separator
+ + "5", PreFix + File.separator
+ + "6", PreFix + File.separator
+ + "7", PreFix + File.separator
+ + "8", PreFix + File.separator
+ + "9", PreFix + File.separator
+ + "10"}; , PreFix + File.separator
+ + "TwoKmer", PreFix + File.separator
+ "ThreeKmer", PreFix + File.separator
+ "SinglePath", PreFix + File.separator
+ "SimplePath", PreFix + File.separator