delete extra class
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/P1MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/P1MessageWritable.java
deleted file mode 100644
index a86ffe0..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/P1MessageWritable.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package edu.uci.ics.genomix.pregelix.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.type.CheckMessage;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-
-public class P1MessageWritable implements WritableComparable<P1MessageWritable>{
- /**
- * sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * file stores the point to the file that stores the chains of connected DNA
- */
- private KmerBytesWritable sourceVertexId;
- private byte adjMap;
- private byte lastGeneCode;
- private byte message;
-
- private byte checkMessage;
-
- public P1MessageWritable(){
- sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- adjMap = (byte)0;
- lastGeneCode = (byte)0;
- message = Message.NON;
- checkMessage = (byte)0;
- }
-
- public void set(KmerBytesWritable sourceVertex, byte adjMap, byte lastGeneCode, byte message){
- checkMessage = 0;
- if(sourceVertexId != null){
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- if(adjMap != 0){
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- if(lastGeneCode != 0){
- checkMessage |= CheckMessage.LASTGENECODE;
- this.lastGeneCode = lastGeneCode;
- }
- this.message = message;
- }
-
- public void reset(){
- checkMessage = 0;
- sourceVertexId = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- adjMap = (byte)0;
- lastGeneCode = (byte)0;
- message = Message.NON;
- }
-
- public KmerBytesWritable getSourceVertexId() {
- return sourceVertexId;
- }
-
- public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
- if(sourceVertexId != null){
- checkMessage |= CheckMessage.SOURCE;
- this.sourceVertexId.set(sourceVertexId);
- }
- }
-
- public byte getAdjMap() {
- return adjMap;
- }
-
- public void setAdjMap(byte adjMap) {
- if(adjMap != 0){
- checkMessage |= CheckMessage.ADJMAP;
- this.adjMap = adjMap;
- }
- }
-
- public byte getLastGeneCode() {
- return lastGeneCode;
- }
-
- public void setLastGeneCode(byte lastGeneCode) {
- if(lastGeneCode != 0){
- checkMessage |= CheckMessage.LASTGENECODE;
- this.lastGeneCode = lastGeneCode;
- }
- }
-
- public byte getMessage() {
- return message;
- }
-
- public void setMessage(byte message) {
- this.message = message;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeByte(checkMessage);
- if((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.write(out);
- if((checkMessage & CheckMessage.ADJMAP) != 0)
- out.write(adjMap);
- if((checkMessage & CheckMessage.LASTGENECODE) != 0)
- out.write(lastGeneCode);
- out.write(message);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.reset();
- checkMessage = in.readByte();
- if((checkMessage & CheckMessage.SOURCE) != 0)
- sourceVertexId.readFields(in);
- if((checkMessage & CheckMessage.ADJMAP) != 0)
- adjMap = in.readByte();
- if((checkMessage & CheckMessage.LASTGENECODE) != 0)
- lastGeneCode = in.readByte();
- message = in.readByte();
- }
-
- @Override
- public int hashCode() {
- return sourceVertexId.hashCode();
- }
- @Override
- public boolean equals(Object o) {
- if (o instanceof P1MessageWritable) {
- P1MessageWritable tp = (P1MessageWritable) o;
- return sourceVertexId.equals( tp.sourceVertexId);
- }
- return false;
- }
- @Override
- public String toString() {
- return sourceVertexId.toString();
- }
-
- @Override
- public int compareTo(P1MessageWritable tp) {
- return sourceVertexId.compareTo(tp.sourceVertexId);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
deleted file mode 100644
index 6fef3a6..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LoadGraphVertex.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class LoadGraphVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
-
- /**
- * For test, just output original file
- */
- @Override
- public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(LoadGraphVertex.class.getSimpleName());
- job.setVertexClass(LoadGraphVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
deleted file mode 100644
index 62de480..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogFilterVertex.java
+++ /dev/null
@@ -1,347 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class LogFilterVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
- public static final String KMER_SIZE = "TwoStepLogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "TwoStepLogAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex(){
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- }
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
- return getDestVertexIdFromChain(chainVertexId, adjMap);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
- lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
- }
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
- destVertexId.set(getNextDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState(){
- if(msg.getMessage() == Message.START &&
- (getVertexValue().getState() == State.MID_VERTEX || getVertexValue().getState() == State.END_VERTEX)){
- getVertexValue().setState(State.START_VERTEX);
- setVertexValue(getVertexValue());
- }
- else if(msg.getMessage() == Message.END && getVertexValue().getState() == State.MID_VERTEX){
- getVertexValue().setState(State.END_VERTEX);
- setVertexValue(getVertexValue());
- voteToHalt();
- }
- else
- voteToHalt();
- }
- /**
- * send start message to next node
- */
- public void sendStartMsgToNextNode(){
- msg.reset();
- msg.setMessage(Message.START);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * send end message to next node
- */
- public void sendEndMsgToNextNode(){
- msg.reset();
- msg.setMessage(Message.END);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * send non message to next node
- */
- public void sendNonMsgToNextNode(){
- msg.setMessage(Message.NON);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(KmerBytesWritable chainVertexId, byte adjMap){
- if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1
- || getVertexValue().getState() == State.FINAL_VERTEX) //|| lastKmer == null
- voteToHalt();
- else{
- destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
- if(getVertexValue().getState() == State.START_VERTEX){
- sendStartMsgToNextNode();
- }
- else if(getVertexValue().getState() != State.END_VERTEX){
- sendEndMsgToNextNode();
- }
- }
- }
- /**
- * path send message to head
- */
- public void responseMsgToHeadVertex(){
- if(getVertexValue().getLengthOfMergeChain() == 0){
- getVertexValue().setMergeChain(getVertexId());
- setVertexValue(getVertexValue());
- }
- destVertexId.set(msg.getSourceVertexId());
- msg.set(null, getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage());
- setMessageType(msg.getMessage());
- sendMsg(destVertexId,msg);
- }
- /**
- * set message type
- */
- public void setMessageType(int message){
- //kill Message because it has been merged by the head
- if(getVertexValue().getState() == State.END_VERTEX){
- msg.setMessage(Message.END);
- getVertexValue().setState(State.END_VERTEX);
- setVertexValue(getVertexValue());
- }
- else
- msg.setMessage(Message.NON);
-
- if(message == Message.START){
- deleteVertex(getVertexId());
- }
- }
- /**
- * set vertexValue's state chainVertexId, value
- */
- public boolean setVertexValueAttributes(){
- if(msg.getMessage() == Message.END){
- if(getVertexValue().getState() != State.START_VERTEX)
- getVertexValue().setState(State.END_VERTEX);
- else
- getVertexValue().setState(State.FINAL_VERTEX);
- }
-
- if(getSuperstep() == 5)
- chainVertexId.set(getVertexId());
- else
- chainVertexId.set(getVertexValue().getMergeChain());
- lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
- chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
- if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
- getVertexValue().setMergeChain(null);
- getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
- chainVertexId.getGeneCodeAtPosition(kmerSize)));
- getVertexValue().setState(State.CYCLE);
- return false;
- }
- else
- getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
- getVertexValue().setAdjMap(tmpVertexValue);
- return true;
- }
- /**
- * send message to self
- */
- public void sendMsgToSelf(){
- if(msg.getMessage() != Message.END){
- setVertexValue(getVertexValue());
- msg.reset(); //reset
- msg.setAdjMap(getVertexValue().getAdjMap());
- sendMsg(getVertexId(),msg);
- }
- }
- /**
- * start sending message
- */
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- msg.set(null, null, (byte)0, Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
- }
- if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(null, null, (byte)0, Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
- }
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- getVertexValue().setState(State.MID_VERTEX);
- setVertexValue(getVertexValue());
- }
- }
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- msg = msgIterator.next();
- setState();
- }
- }
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(getSuperstep() == 3){
- sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
- }
- else{
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- if(mergeChainVertex(msgIterator))
- sendMsgToPathVertex(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
- else
- voteToHalt();
- }
- if(getVertexValue().getState() == State.END_VERTEX){
- voteToHalt();
- }
- if(getVertexValue().getState() == State.FINAL_VERTEX){
- //String source = getVertexValue().getMergeChain().toString();
- voteToHalt();
- }
- }
- }
- /**
- * path response message to head
- */
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- responseMsgToHeadVertex();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX){
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
- }
- }
- }
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- return setVertexValueAttributes();
- }
-
- @Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1){
- if(getVertexId().toString().equals("AAGAC")
- || getVertexId().toString().equals("AGCAC")){
- startSendMsg();
- }
- }
- else if(getSuperstep() == 2){
- initState(msgIterator);
- if(getVertexValue().getState() == State.NON_VERTEX)
- voteToHalt();
- }
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- sendMsgToPathVertex(msgIterator);
- }
- else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
- responseMsgToHeadVertex(msgIterator);
- }
- else
- voteToHalt();
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
deleted file mode 100644
index 12c12af..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveFilterVertex.java
+++ /dev/null
@@ -1,221 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.NaiveAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class NaiveFilterVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, NaiveAlgorithmMessageWritable>{
-
- public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
- public static final String FILTERKMER = "NaiveFilterVertex.filterKmer";
- public static int kmerSize = -1;
- private int maxIteration = -1;
- private String filterKmer = "";
-
- private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
-
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex(){
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- if(filterKmer.equals(""))
- filterKmer = getContext().getConfiguration().get(FILTERKMER, "");
- }
- public void findDestination(){
- destVertexId.set(msg.getSourceVertexId());
- }
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
- lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
- }
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
- destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
- /**
- * initiate chain vertex
- */
- public void initChainVertex(){
- if(!msg.isRear()){
- findDestination();
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- chainVertexId.set(getVertexId());
- msg.set(getVertexId(), chainVertexId, getVertexId(), getVertexValue().getAdjMap(), false);
- sendMsg(destVertexId,msg);
- }else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap()))
- voteToHalt();
- }
- }
- /**
- * head node sends message to path node
- */
- public void sendMsgToPathVertex(){
- if(!msg.isRear()){
- destVertexId.set(getDestVertexIdFromChain(msg.getChainVertexId(), msg.getAdjMap()));
- msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
- }else{
- destVertexId.set(msg.getHeadVertexId());
- msg.set(msg.getSourceVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, msg.isRear());
- }
- sendMsg(destVertexId,msg);
- }
- /**
- * path node sends message back to head node
- */
- public void responseMsgToHeadVertex(){
- if(!msg.isRear()){
- findDestination();
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
- getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- deleteVertex(getVertexId());
- msg.set(getVertexId(), chainVertexId, msg.getHeadVertexId(), getVertexValue().getAdjMap(), false);
- sendMsg(destVertexId,msg);
- }
- else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
- sendMsg(destVertexId,msg);
- }
- }else{// is Rear
- if(msg.getLengthOfChain() > kmerSize){
- byte tmp = GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getAdjMap(), msg.getSourceVertexId(), kmerSize);
- getVertexValue().set(tmp, State.FINAL_VERTEX, msg.getChainVertexId());
- setVertexValue(getVertexValue());
- //String source = msg.getChainVertexId().toString();
- //System.out.print("");
- }
- }
- }
-
- @Override
- public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- if(getVertexId().toString().equals(filterKmer)){
- getVertexValue().setState(State.FILTER);
- setVertexValue(getVertexValue());
- msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- }
- else
- voteToHalt();
- }
- }
- else if(getSuperstep() == 2){
- if(msgIterator.hasNext()){
- getVertexValue().setState(State.FILTER);
- setVertexValue(getVertexValue());
- msg = msgIterator.next();
- initChainVertex();
-
- }
- }
- //head node sends message to path node
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- while (msgIterator.hasNext()){
- getVertexValue().setState(State.FILTER);
- setVertexValue(getVertexValue());
- msg = msgIterator.next();
- sendMsgToPathVertex();
- }
- }
- //path node sends message back to head node
- else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
- while(msgIterator.hasNext()){
- getVertexValue().setState(State.FILTER);
- setVertexValue(getVertexValue());
- msg = msgIterator.next();
- responseMsgToHeadVertex();
- }
- }
- voteToHalt();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(NaiveFilterVertex.class.getSimpleName());
- job.setVertexClass(NaiveFilterVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P1ForPathMergeVertex.java
deleted file mode 100644
index ca18c75..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P1ForPathMergeVertex.java
+++ /dev/null
@@ -1,232 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.P1MessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ByteWritable
- * edgeValue: NullWritable
- * message: NaiveAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-/**
- * Naive Algorithm for path merge graph
- */
-public class P1ForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, P1MessageWritable>{
- public static final String KMER_SIZE = "NaiveAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private P1MessageWritable incomingMsg = new P1MessageWritable();
- private P1MessageWritable outgoingMsg = new P1MessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex(){
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- outgoingMsg.reset();
- }
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
- }
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
- }
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
- destVertexId.set(getDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
- }
- }
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, outgoingMsg);
- }
- }
- }
- /**
- * start sending message
- */
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- }
- if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- }
- }
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<P1MessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
- /**
- * set vertex state
- */
- public void setState(){
- if(incomingMsg.getMessage() == Message.START){
- getVertexValue().setState(State.START_VERTEX);
- }
- else if(incomingMsg.getMessage() == Message.END
- && getVertexValue().getState() != State.START_VERTEX){
- getVertexValue().setState(State.END_VERTEX);
- voteToHalt();
- }
- else
- voteToHalt();
- }
- /**
- * head node sends message to path node
- */
- public void sendMsgToPathVertex(Iterator<P1MessageWritable> msgIterator){
- if(getSuperstep() == 3){
- getVertexValue().setMergeChain(getVertexId());
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
- getVertexValue().getAdjMap()));
- sendMsg(destVertexId,outgoingMsg);
- }else{
- while (msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- if(incomingMsg.getMessage() != Message.STOP){
- getVertexValue().setMergeChain(kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
- outgoingMsg.setSourceVertexId(getVertexId());
- destVertexId.set(getDestVertexIdFromChain(getVertexValue().getMergeChain(),
- incomingMsg.getAdjMap()));
- sendMsg(destVertexId,outgoingMsg);
- }
- else{
- getVertexValue().setMergeChain(kmerFactory.mergeKmerWithNextCode(getVertexValue().getMergeChain(),
- incomingMsg.getLastGeneCode()));
- byte adjMap = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(),
- incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(adjMap);
- getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
- //System.out.println();
- }
- }
- }
- }
- /**
- * path node sends message back to head node
- */
- public void responseMsgToHeadVertex(){
- deleteVertex(getVertexId());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- outgoingMsg.setLastGeneCode(getVertexId().getGeneCodeAtPosition(kmerSize - 1));
- if(getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.STOP);
- sendMsg(incomingMsg.getSourceVertexId(),outgoingMsg);
- }
-
- @Override
- public void compute(Iterator<P1MessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1) {
- startSendMsg();
- voteToHalt();
- }
- else if(getSuperstep() == 2)
- initState(msgIterator);
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- sendMsgToPathVertex(msgIterator);
- voteToHalt();
- }
- else if(getSuperstep()%2 == 0 && getSuperstep() > 2 && getSuperstep() <= maxIteration){
- while(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- responseMsgToHeadVertex();
- }
- voteToHalt();
- }
- }
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(NaiveAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P1ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput
- */
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P2ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P2ForPathMergeVertex.java
deleted file mode 100644
index 7d5263d..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/P2ForPathMergeVertex.java
+++ /dev/null
@@ -1,274 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class P2ForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
- public static final String KMER_SIZE = "TwoStepLogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "TwoStepLogAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private LogAlgorithmMessageWritable incomingMsg = new LogAlgorithmMessageWritable();
- private LogAlgorithmMessageWritable outgoingMsg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex(){
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- outgoingMsg.reset();
- }
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
- return getDestVertexIdFromChain(chainVertexId, adjMap);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
- VKmerBytesWritable lastKmer = kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId);
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
- }
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
- sendMsg(getNextDestVertexId(vertexId, x), outgoingMsg);
- }
- }
- }
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
- sendMsg(getPreDestVertexId(vertexId, x), outgoingMsg);
- }
- }
- }
- /**
- * start sending message
- */
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- outgoingMsg.setMessage(Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
- }
- if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- outgoingMsg.setMessage(Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
- }
- }
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- incomingMsg = msgIterator.next();
- setState();
- }
- }
- }
- /**
- * set vertex state
- */
- public void setState(){
- if(incomingMsg.getMessage() == Message.START){
- getVertexValue().setState(State.START_VERTEX);
- getVertexValue().setMergeChain(null);
- }
- else if(incomingMsg.getMessage() == Message.END && getVertexValue().getState() != State.START_VERTEX){
- getVertexValue().setState(State.END_VERTEX);
- getVertexValue().setMergeChain(getVertexId());
- voteToHalt();
- }
- else
- voteToHalt();
- }
- /**
- * head send message to path
- */
- public void sendOutMsg(KmerBytesWritable chainVertexId, byte adjMap){
- if(getVertexValue().getState() == State.START_VERTEX){
- outgoingMsg.setMessage(Message.START);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
- }
- else if(getVertexValue().getState() != State.END_VERTEX){
- outgoingMsg.setMessage(Message.NON);
- outgoingMsg.setSourceVertexId(getVertexId());
- sendMsg(getNextDestVertexIdFromBitmap(chainVertexId, adjMap), outgoingMsg);
- }
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(getSuperstep() == 3){
- getVertexValue().setMergeChain(getVertexId());
- sendOutMsg(getVertexId(), getVertexValue().getAdjMap());
- }
- else{
- if(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- if(mergeChainVertex(msgIterator)){
- if(incomingMsg.getMessage() == Message.END){
- if(getVertexValue().getState() == State.START_VERTEX){
- getVertexValue().setState(State.FINAL_VERTEX);
- //String source = getVertexValue().getMergeChain().toString();
- //System.out.println();
- }
- else
- getVertexValue().setState(State.END_VERTEX);
- }
- else
- sendOutMsg(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
- }
- }
- }
- }
- /**
- * path response message to head
- */
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
- incomingMsg = msgIterator.next();
- outgoingMsg.setChainVertexId(getVertexValue().getMergeChain());
- outgoingMsg.setAdjMap(getVertexValue().getAdjMap());
- if(getVertexValue().getState() == State.END_VERTEX)
- outgoingMsg.setMessage(Message.END);
- sendMsg(incomingMsg.getSourceVertexId(),outgoingMsg);
-
- if(incomingMsg.getMessage() == Message.START)
- deleteVertex(getVertexId());
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX)
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
- }
- }
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- //merge chain
- lastKmer.set(kmerFactory.getLastKmerFromChain(incomingMsg.getLengthOfChain() - kmerSize + 1,
- incomingMsg.getChainVertexId()));
- chainVertexId.set(kmerFactory.mergeTwoKmer(getVertexValue().getMergeChain(),
- lastKmer));
- if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
- getVertexValue().setMergeChain(null);
- getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
- chainVertexId.getGeneCodeAtPosition(kmerSize)));
- getVertexValue().setState(State.CYCLE);
- return false;
- }
- else
- getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(),
- incomingMsg.getAdjMap());
- getVertexValue().setAdjMap(tmpVertexValue);
- return true;
- }
- @Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1)
- startSendMsg();
- else if(getSuperstep() == 2)
- initState(msgIterator);
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- sendMsgToPathVertex(msgIterator);
- voteToHalt();
- }
- else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
- responseMsgToHeadVertex(msgIterator);
- voteToHalt();
- }
- else
- voteToHalt();
- }
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(TwoStepLogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(P2ForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
deleted file mode 100644
index 7946460..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
+++ /dev/null
@@ -1,362 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class ThreeStepLogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
- public static final String KMER_SIZE = "ThreeStepLogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "ThreeStepLogAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex(){
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 20);
- }
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
- return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
- lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
- }
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
- destVertexId.set(getNextDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
-
- /**
- * set vertex state
- */
- public void setState(){
- if(msg.getMessage() == Message.START &&
- (getVertexValue().getState() == State.MID_VERTEX || getVertexValue().getState() == State.END_VERTEX)){
- getVertexValue().setState(State.START_VERTEX);
- setVertexValue(getVertexValue());
- }
- else if(msg.getMessage() == Message.END && getVertexValue().getState() == State.MID_VERTEX){
- getVertexValue().setState(State.END_VERTEX);
- setVertexValue(getVertexValue());
- voteToHalt();
- }
- else
- voteToHalt();
- }
- /**
- * send start message to next node
- */
- public void sendStartMsgToNextNode(){
- msg.setMessage(Message.START);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * send end message to next node
- */
- public void sendEndMsgToNextNode(){
- msg.setMessage(Message.END);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * send non message to next node
- */
- public void sendNonMsgToNextNode(){
- msg.setMessage(Message.NON);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(KmerBytesWritable chainVertexId, byte adjMap){
- if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
- voteToHalt();
- else{
- destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
- if(getVertexValue().getState() == State.START_VERTEX){
- sendStartMsgToNextNode();
- }
- else if(getVertexValue().getState() != State.END_VERTEX){
- sendEndMsgToNextNode();
- }
- }
- }
- /**
- * path send message to head
- */
- public void responseMsgToHeadVertex(){
- if(getVertexValue().getLengthOfMergeChain() == -1){
- getVertexValue().setMergeChain(getVertexId());
- setVertexValue(getVertexValue());
- }
- msg.set(null, getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage());
- //msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
- setMessageType(msg.getMessage());
- destVertexId.set(msg.getSourceVertexId());
- sendMsg(destVertexId,msg);
- }
- /**
- * set message type
- */
- public void setMessageType(int message){
- //kill Message because it has been merged by the head
- if(getVertexValue().getState() == State.END_VERTEX){
- msg.setMessage(Message.END);
- getVertexValue().setState(State.END_VERTEX);
- setVertexValue(getVertexValue());
- //deleteVertex(getVertexId());
- }
- else
- msg.setMessage(Message.NON);
-
- if(message == Message.START){
- getVertexValue().setState(State.TODELETE);
- setVertexValue(getVertexValue());
- }
- }
- /**
- * set vertexValue's state chainVertexId, value
- */
- public void setVertexValueAttributes(){
- if(msg.getMessage() == Message.END){
- if(getVertexValue().getState() != State.START_VERTEX)
- getVertexValue().setState(State.END_VERTEX);
- else
- getVertexValue().setState(State.FINAL_VERTEX);
- }
-
- if(getSuperstep() == 5)
- chainVertexId.set(getVertexId());
- else
- chainVertexId.set(getVertexValue().getMergeChain());
- lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
- chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
- getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
- getVertexValue().setAdjMap(tmpVertexValue);
- }
- /**
- * send message to self
- */
- public void sendMsgToSelf(){
- if(msg.getMessage() != Message.END){
- setVertexValue(getVertexValue());
- msg.reset(); //reset
- msg.setAdjMap(getVertexValue().getAdjMap());
- sendMsg(getVertexId(),msg);
- }
- }
- /**
- * start sending message
- */
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- msg.set(null, null, (byte)0, Message.START);
- //msg.set(getVertexId(), chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
- }
- if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(null, null, (byte)0, Message.END);
- //msg.set(getVertexId(), chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- voteToHalt();
- }
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- getVertexValue().setState(State.MID_VERTEX);
- setVertexValue(getVertexValue());
- }
- }
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- msg = msgIterator.next();
- setState();
- }
- }
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(getSuperstep() == 3){
- msg.reset();
- sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
- }
- else{
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- sendMsgToPathVertex(getVertexValue().getMergeChain(), msg.getAdjMap());
- }
- }
- }
- /**
- * path response message to head
- */
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- responseMsgToHeadVertex();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX){
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
- }
- }
- }
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- setVertexValueAttributes();
- sendMsgToSelf();
- }
- if(getVertexValue().getState() == State.END_VERTEX){
- voteToHalt();
- }
- if(getVertexValue().getState() == State.FINAL_VERTEX){
- //String source = getVertexValue().getMergeChain().toString();
- voteToHalt();
- }
- }
- @Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- initVertex();
- if (getSuperstep() == 1)
- startSendMsg();
- else if(getSuperstep() == 2)
- initState(msgIterator);
- else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
- sendMsgToPathVertex(msgIterator);
- }
- else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
- responseMsgToHeadVertex(msgIterator);
- }
- else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
- if(getVertexValue().getState() == State.TODELETE){
- deleteVertex(getVertexId()); //killSelf
- }
- else{
- mergeChainVertex(msgIterator);
- }
- }
- else
- voteToHalt();
- }
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(ThreeStepLogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(ThreeStepLogAlgorithmForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
deleted file mode 100644
index 6b67333..0000000
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
+++ /dev/null
@@ -1,367 +0,0 @@
-package edu.uci.ics.genomix.pregelix.operator;
-
-import java.util.Iterator;
-import java.util.logging.Logger;
-
-import org.apache.hadoop.io.NullWritable;
-
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.genomix.pregelix.client.Client;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.LogAlgorithmMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.Message;
-import edu.uci.ics.genomix.pregelix.type.State;
-import edu.uci.ics.genomix.pregelix.util.GraphVertexOperation;
-import edu.uci.ics.genomix.type.GeneCode;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritable;
-import edu.uci.ics.genomix.type.VKmerBytesWritableFactory;
-/*
- * vertexId: BytesWritable
- * vertexValue: ValueStateWritable
- * edgeValue: NullWritable
- * message: LogAlgorithmMessageWritable
- *
- * DNA:
- * A: 00
- * C: 01
- * G: 10
- * T: 11
- *
- * succeed node
- * A 00000001 1
- * G 00000010 2
- * C 00000100 4
- * T 00001000 8
- * precursor node
- * A 00010000 16
- * G 00100000 32
- * C 01000000 64
- * T 10000000 128
- *
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
- * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
- * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
- */
-public class TwoStepLogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
- public static Logger logger = Logger.getLogger(TwoStepLogAlgorithmForPathMergeVertex.class.getName());
-
- public static final String KMER_SIZE = "TwoStepLogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "TwoStepLogAlgorithmForPathMergeVertex.iteration";
- public static int kmerSize = -1;
- private int maxIteration = -1;
-
- private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
- /**
- * initiate kmerSize, maxIteration
- */
- public void initVertex(){
- if(kmerSize == -1)
- kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
- }
- /**
- * get destination vertex
- */
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
- return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
- }
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
- return getDestVertexIdFromChain(chainVertexId, adjMap);
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
- lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
- }
- /**
- * head send message to all next nodes
- */
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
- destVertexId.set(getNextDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
- /**
- * head send message to all previous nodes
- */
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
- destVertexId.set(getPreDestVertexId(vertexId, x));
- sendMsg(destVertexId, msg);
- }
- }
- }
- /**
- * set vertex state
- */
- public void setState(){
- if(msg.getMessage() == Message.START &&
- (getVertexValue().getState() == State.MID_VERTEX || getVertexValue().getState() == State.END_VERTEX)){
- getVertexValue().setState(State.START_VERTEX);
- setVertexValue(getVertexValue());
- }
- else if(msg.getMessage() == Message.END && getVertexValue().getState() == State.MID_VERTEX){
- getVertexValue().setState(State.END_VERTEX);
- setVertexValue(getVertexValue());
- voteToHalt();
- }
- else
- voteToHalt();
- }
- /**
- * send start message to next node
- */
- public void sendStartMsgToNextNode(){
- msg.reset();
- msg.setMessage(Message.START);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * send end message to next node
- */
- public void sendEndMsgToNextNode(){
- msg.reset();
- msg.setMessage(Message.END);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * send non message to next node
- */
- public void sendNonMsgToNextNode(){
- msg.reset();
- msg.setMessage(Message.NON);
- msg.setSourceVertexId(getVertexId());
- sendMsg(destVertexId, msg);
- voteToHalt();
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(KmerBytesWritable chainVertexId, byte adjMap){
- //if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1
- // || getVertexValue().getState() == State.FINAL_VERTEX) //|| lastKmer == null
- // voteToHalt();
- //else{
- destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
- if(getVertexValue().getState() == State.START_VERTEX){
- sendStartMsgToNextNode();
- }
- else if(getVertexValue().getState() != State.END_VERTEX){ //FINAL_DELETE
- sendNonMsgToNextNode();//sendEndMsgToNextNode();
- }
- //}
- }
- /**
- * path send message to head
- */
- public void responseMsgToHeadVertex(){
- if(getVertexValue().getLengthOfMergeChain() == 0){
- getVertexValue().setMergeChain(getVertexId());
- setVertexValue(getVertexValue());
- }
- destVertexId.set(msg.getSourceVertexId());
- msg.set(null, getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage());
- setMessageType(msg.getMessage());
- sendMsg(destVertexId,msg);
- }
- /**
- * set message type
- */
- public void setMessageType(int message){
- //kill Message because it has been merged by the head
- if(getVertexValue().getState() == State.END_VERTEX){ //FINAL_DELETE
- msg.setMessage(Message.END);
- getVertexValue().setState(State.END_VERTEX); //FINAL_DELETE
- setVertexValue(getVertexValue());
- }
- else
- msg.setMessage(Message.NON);
-
- if(message == Message.START){
- deleteVertex(getVertexId());
- }
- }
- /**
- * set vertexValue's state chainVertexId, value
- */
- public boolean setVertexValueAttributes(){
- if(msg.getMessage() == Message.END){
- if(getVertexValue().getState() != State.START_VERTEX)
- getVertexValue().setState(State.END_VERTEX);
- else
- getVertexValue().setState(State.FINAL_VERTEX);
- }
-
- if(getSuperstep() == 5)
- chainVertexId.set(getVertexId());
- else
- chainVertexId.set(getVertexValue().getMergeChain());
- lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
- chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
- if(GraphVertexOperation.isCycle(getVertexId(), chainVertexId)){
- getVertexValue().setMergeChain(null);
- getVertexValue().setAdjMap(GraphVertexOperation.reverseAdjMap(getVertexValue().getAdjMap(),
- chainVertexId.getGeneCodeAtPosition(kmerSize)));
- getVertexValue().setState(State.CYCLE);
- return false;
- }
- else
- getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
- getVertexValue().setAdjMap(tmpVertexValue);
- return true;
- }
- /**
- * send message to self
- */
- public void sendMsgToSelf(){
- if(msg.getMessage() != Message.END){
- setVertexValue(getVertexValue());
- msg.reset(); //reset
- msg.setAdjMap(getVertexValue().getAdjMap());
- sendMsg(getVertexId(),msg);
- }
- }
- /**
- * start sending message
- */
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- msg.set(null, null, (byte)0, Message.START);
- sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
- //voteToHalt();
- }
- if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(null, null, (byte)0, Message.END);
- sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
- //voteToHalt();
- }
- if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- getVertexValue().setState(State.MID_VERTEX);
- setVertexValue(getVertexValue());
- }
- else
- voteToHalt();
- }
- /**
- * initiate head, rear and path node
- */
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
- msgIterator.next();
- voteToHalt();
- }
- else{
- msg = msgIterator.next();
- setState();
- }
- }
- }
- /**
- * head send message to path
- */
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(getSuperstep() == 3){
- sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
- }
- else{
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- if(mergeChainVertex(msgIterator))
- sendMsgToPathVertex(getVertexValue().getMergeChain(), getVertexValue().getAdjMap());
- else
- voteToHalt();
- }
- if(getVertexValue().getState() == State.END_VERTEX){ //FINAL_DELETE
- voteToHalt();
- }
- if(getVertexValue().getState() == State.FINAL_VERTEX){
- //String source = getVertexValue().getMergeChain().toString();
- voteToHalt();
- }
- }
- }
- /**
- * path response message to head
- */
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
- msg = msgIterator.next();
- responseMsgToHeadVertex();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX){ //FINAL_DELETE
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
- }
- }
- }
- /**
- * merge chainVertex and store in vertexVal.chainVertexId
- */
- public boolean mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- return setVertexValueAttributes();
- }
- @Override
- public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- initVertex();
- if(getVertexValue().getState() == State.FINAL_VERTEX)
- voteToHalt();
- else{
- if (getSuperstep() == 1)
- startSendMsg();
- else if(getSuperstep() == 2)
- initState(msgIterator);
- else if(getSuperstep()%2 == 1 && getSuperstep() <= maxIteration){
- sendMsgToPathVertex(msgIterator);
- }
- else if(getSuperstep()%2 == 0 && getSuperstep() <= maxIteration){
- responseMsgToHeadVertex(msgIterator);
- }
- else
- voteToHalt();
- }
- }
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(TwoStepLogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(TwoStepLogAlgorithmForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
- }
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
deleted file mode 100644
index bead712..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/FilterJobGenerator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package edu.uci.ics.genomix.pregelix.JobGen;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.format.NaiveAlgorithmForPathMergeOutputFormat;
-import edu.uci.ics.genomix.pregelix.format.LogAlgorithmForPathMergeInputFormat;
-import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.operator.LogFilterVertex;
-import edu.uci.ics.genomix.pregelix.operator.NaiveAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.pregelix.operator.NaiveFilterVertex;
-import edu.uci.ics.genomix.pregelix.operator.TwoStepLogAlgorithmForPathMergeVertex;
-import edu.uci.ics.genomix.type.KmerBytesWritable;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-
-
-public class FilterJobGenerator {
-
- public static String outputBase = "src/test/resources/jobs/";
-
- private static void generateNaiveFilterJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(NaiveFilterVertex.class);
- job.setVertexInputFormatClass(NaiveAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(NaiveAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 55);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genNaiveFilter() throws IOException {
- generateNaiveFilterJob("NaiveFilterVertex", outputBase + "NaiveFilterVertex.xml");
- }
-
- private static void generateLogFilterJob(String jobName, String outputPath) throws IOException {
- PregelixJob job = new PregelixJob(jobName);
- job.setVertexClass(LogFilterVertex.class);
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setDynamicVertexValueSize(true);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.getConfiguration().setInt(TwoStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
- job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
- }
-
- private static void genLogFilter() throws IOException {
- generateLogFilterJob("LogFilterVertex", outputBase + "LogFilterVertex.xml");
- }
-
- /**
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- genNaiveFilter();
- //genLogFilter();
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java
deleted file mode 100644
index a5ddce3..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestCase.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package edu.uci.ics.genomix.pregelix.JobRun;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.pregelix.example.util.TestUtils;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
-import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
-
-public class RunJobTestCase extends TestCase{
-
- private static final String NC1 = "nc1";
- private static final String HYRACKS_APP_NAME = "pregelix";
- private static String HDFS_INPUTPATH = "/webmap";
- private static String HDFS_OUTPUTPAH = "/result";
-
- private final PregelixJob job;
- private JobGen[] giraphJobGens;
- private final String resultFileName;
- private final String expectedFileName;
- private final String jobFile;
-
-
-
- public RunJobTestCase(String hadoopConfPath, String jobName, String jobFile, String resultFile, String expectedFile)
- throws Exception {
- super("test");
- this.jobFile = jobFile;
- this.job = new PregelixJob("test");
- this.job.getConfiguration().addResource(new Path(jobFile));
- this.job.getConfiguration().addResource(new Path(hadoopConfPath));
- Path[] inputPaths = FileInputFormat.getInputPaths(job);
- if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH)) {
- FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
- FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
- }
-
- job.setJobName(jobName);
- this.resultFileName = resultFile;
- this.expectedFileName = expectedFile;
- giraphJobGens = new JobGen[1];
- giraphJobGens[0] = new JobGenOuterJoin(job);
- /*waitawhile();
- giraphJobGens[1] = new JobGenInnerJoin(job);
- waitawhile();
- giraphJobGens[2] = new JobGenOuterJoinSort(job);
- waitawhile();
- giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);*/
- }
-
- private void waitawhile() throws InterruptedException {
- synchronized (this) {
- this.wait(20);
- }
- }
- @Test
- public void test() throws Exception {
- setUp();
-
- for (JobGen jobGen : giraphJobGens) {
- FileSystem dfs = FileSystem.get(job.getConfiguration());
- dfs.delete(new Path(HDFS_OUTPUTPAH), true);
- runCreate(jobGen);
- runDataLoad(jobGen);
- int i = 1;
- boolean terminate = false;
- do {
- runLoopBodyIteration(jobGen, i);
- terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
- i++;
- } while (!terminate);
- runIndexScan(jobGen);
- runHDFSWRite(jobGen);
- runCleanup(jobGen);
- compareResults();
- }
- tearDown();
- waitawhile();
- }
-
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification treeCreateJobSpec = jobGen.generateCreatingJob();
- PregelixHyracksIntegrationUtil.runJob(treeCreateJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runDataLoad(JobGen jobGen) throws Exception {
- try {
- JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
- PregelixHyracksIntegrationUtil.runJob(bulkLoadJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
- try {
- JobSpecification loopBody = jobGen.generateJob(iteration);
- PregelixHyracksIntegrationUtil.runJob(loopBody, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runIndexScan(JobGen jobGen) throws Exception {
- try {
- JobSpecification scanSortPrintJobSpec = jobGen.scanIndexPrintGraph(NC1, resultFileName);
- PregelixHyracksIntegrationUtil.runJob(scanSortPrintJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runHDFSWRite(JobGen jobGen) throws Exception {
- try {
- JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
- PregelixHyracksIntegrationUtil.runJob(scanSortPrintJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runCleanup(JobGen jobGen) throws Exception {
- try {
- JobSpecification[] cleanups = jobGen.generateCleanup();
- runJobArray(cleanups);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runJobArray(JobSpecification[] jobs) throws Exception {
- for (JobSpecification job : jobs) {
- PregelixHyracksIntegrationUtil.runJob(job, HYRACKS_APP_NAME);
- }
- }
-
- private void compareResults() throws Exception {
- TestUtils.compareWithResult(new File(resultFileName), new File(expectedFileName));
- }
-
- public String toString() {
- return jobFile;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
deleted file mode 100644
index 1af0d6e..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package edu.uci.ics.genomix.pregelix.JobRun;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import junit.framework.Test;
-import junit.framework.TestResult;
-import junit.framework.TestSuite;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-
-public class RunJobTestSuite extends TestSuite {
-
- private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
- .getName());
-
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
- private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
- private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
- private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
- private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
- private static final String FILE_EXTENSION_OF_RESULTS = "result";
-
- private static final String DATA_PATH = "data/sequencefile/Path";
- private static final String HDFS_PATH = "/webmap/";
-
- private static final String HYRACKS_APP_NAME = "pregelix";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
- + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
-
- public void setUp() throws Exception {
- ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
- ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
- cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- LOGGER.info("Hyracks mini-cluster started");
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
-
- DataOutputStream confOutput = new DataOutputStream(
- new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-
- public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.deinit();
- LOGGER.info("Hyracks mini-cluster shut down");
- cleanupHDFS();
- }
-
- public static Test suite() throws Exception {
- List<String> ignores = getFileList(PATH_TO_IGNORE);
- List<String> onlys = getFileList(PATH_TO_ONLY);
- File testData = new File(PATH_TO_JOBS);
- File[] queries = testData.listFiles();
- RunJobTestSuite testSuite = new RunJobTestSuite();
- testSuite.setUp();
- boolean onlyEnabled = false;
-
- if (onlys.size() > 0) {
- onlyEnabled = true;
- }
- for (File qFile : queries) {
- if (isInList(ignores, qFile.getName()))
- continue;
-
- if (qFile.isFile()) {
- if (onlyEnabled && !isInList(onlys, qFile.getName())) {
- continue;
- } else {
- String resultFileName = ACTUAL_RESULT_DIR + File.separator
- + jobExtToResExt(qFile.getName());
- String expectedFileName = EXPECTED_RESULT_DIR
- + File.separator + jobExtToResExt(qFile.getName());
- testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
- qFile.getName(),
- qFile.getAbsolutePath().toString(), resultFileName,
- expectedFileName));
- }
- }
- }
- return testSuite;
- }
-
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
-
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
-
- }
-
- protected static List<String> getFileList(String ignorePath)
- throws FileNotFoundException, IOException {
- BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
- String s = null;
- List<String> ignores = new ArrayList<String>();
- while ((s = reader.readLine()) != null) {
- ignores.add(s);
- }
- reader.close();
- return ignores;
- }
-
- private static String jobExtToResExt(String fname) {
- int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
- }
-
- private static boolean isInList(List<String> onlys, String name) {
- for (String only : onlys)
- if (name.indexOf(only) >= 0)
- return true;
- return false;
- }
-
- public JobConf getConf() {
- return conf;
- }
-
- public void setConf(JobConf conf) {
- this.conf = conf;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
deleted file mode 100644
index 4b6d367..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestCase.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-
-import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.core.base.IDriver.Plan;
-import edu.uci.ics.pregelix.core.driver.Driver;
-import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-
-public class PathMergeSmallTestCase extends TestCase {
- private final PregelixJob job;
- private final String resultFileDir;
- private final String textFileDir;
- private final String jobFile;
- private final Driver driver = new Driver(this.getClass());
- private final FileSystem dfs;
-
- public PathMergeSmallTestCase(String hadoopConfPath, String jobName,
- String jobFile, FileSystem dfs, String hdfsInput, String resultFile, String textFile)
- throws Exception {
- super("test");
- this.jobFile = jobFile;
- this.job = new PregelixJob("test");
- this.job.getConfiguration().addResource(new Path(jobFile));
- this.job.getConfiguration().addResource(new Path(hadoopConfPath));
- FileInputFormat.setInputPaths(job, hdfsInput);
- FileOutputFormat.setOutputPath(job, new Path(hdfsInput + "_result"));
- this.textFileDir = textFile;
- job.setJobName(jobName);
- this.resultFileDir = resultFile;
-
- this.dfs = dfs;
- }
-
- private void waitawhile() throws InterruptedException {
- synchronized (this) {
- this.wait(20);
- }
- }
-
- @Test
- public void test() throws Exception {
- setUp();
- Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
- for (Plan plan : plans) {
- driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
- PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT,
- false);
- }
- compareResults();
- tearDown();
- waitawhile();
- }
-
- private void compareResults() throws Exception {
- dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(
- resultFileDir));
- GenerateTextFile.generateFromPathmergeResult(55, resultFileDir, textFileDir);
- // TestUtils.compareWithResultDir(new File(expectedFileDir), new
- // File(resultFileDir));
- }
-
- public String toString() {
- return jobFile;
- }
-
-}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
deleted file mode 100644
index 898d059..0000000
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/pathmerge/PathMergeSmallTestSuite.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.genomix.pregelix.pathmerge;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import junit.framework.Test;
-import junit.framework.TestResult;
-import junit.framework.TestSuite;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-
-@SuppressWarnings("deprecation")
-public class PathMergeSmallTestSuite extends TestSuite {
- private static final Logger LOGGER = Logger
- .getLogger(PathMergeSmallTestSuite.class.getName());
-
- public static final String PreFix = "graphbuildresult"; //"graphbuildresult";
- public static final String[] TestDir = { PreFix + File.separator
- + "result"};
- /*+ "BridgePath", PreFix + File.separator
- + "CyclePath", PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "TreePath"};
- + "2"}, PreFix + File.separator
- + "3", PreFix + File.separator
- + "4", PreFix + File.separator
- + "5", PreFix + File.separator
- + "6", PreFix + File.separator
- + "7", PreFix + File.separator
- + "8", PreFix + File.separator
- + "9", PreFix + File.separator
- + "TwoKmer", PreFix + File.separator
- + "ThreeKmer", PreFix + File.separator
- + "SinglePath", PreFix + File.separator
- + "SimplePath", PreFix + File.separator
- + "Path", PreFix + File.separator
- + "BridgePath", PreFix + File.separator
- + "CyclePath", PreFix + File.separator
- + "RingPath", PreFix + File.separator
- + "LongPath", PreFix + File.separator
- + "TreePath"};*/
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
- private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
- private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
- private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
-
- public static final String HDFS_INPUTPATH = "/PathTestSet";
-
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
- + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
-
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
-
- public void setUp() throws Exception {
- ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
- ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
- cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- LOGGER.info("Hyracks mini-cluster started");
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
-
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
-
- for (String testDir : TestDir) {
- File src = new File(testDir);
- Path dest = new Path(HDFS_INPUTPATH + File.separator + src.getName());
- dfs.mkdirs(dest);
- //src.listFiles()
- //src.listFiles((FilenameFilter)(new WildcardFileFilter("part*")))
- for (File f : src.listFiles()){
- dfs.copyFromLocalFile(new Path(f.getAbsolutePath()), dest);
- }
- }
-
- DataOutputStream confOutput = new DataOutputStream(
- new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
-
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
-
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-
- public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.deinit();
- LOGGER.info("Hyracks mini-cluster shut down");
- cleanupHDFS();
- }
-
- public static Test suite() throws Exception {
- List<String> onlys = getFileList(PATH_TO_ONLY);
- File testData = new File(PATH_TO_JOBS);
- File[] queries = testData.listFiles();
- PathMergeSmallTestSuite testSuite = new PathMergeSmallTestSuite();
- testSuite.setUp();
- boolean onlyEnabled = false;
- FileSystem dfs = FileSystem.get(testSuite.conf);
-
- if (onlys.size() > 0) {
- onlyEnabled = true;
- }
-
- for (File qFile : queries) {
- if (qFile.isFile()) {
- if (onlyEnabled && !isInList(onlys, qFile.getName())) {
- continue;
- } else {
- for (String testPathStr : TestDir) {
- File testDir = new File(testPathStr);
- String resultFileName = ACTUAL_RESULT_DIR
- + File.separator
- + jobExtToResExt(qFile.getName())
- + File.separator + "BinaryOutput"
- + File.separator + testDir.getName();
- String textFileName = ACTUAL_RESULT_DIR
- + File.separator
- + jobExtToResExt(qFile.getName())
- + File.separator + "TextOutput"
- + File.separator + testDir.getName();
- testSuite.addTest(new PathMergeSmallTestCase(
- HADOOP_CONF_PATH, qFile.getName(), qFile
- .getAbsolutePath().toString(),
- dfs, HDFS_INPUTPATH + File.separator + testDir.getName(),
- resultFileName, textFileName));
- }
- }
- }
- }
- return testSuite;
- }
-
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- protected static List<String> getFileList(String ignorePath)
- throws FileNotFoundException, IOException {
- BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
- String s = null;
- List<String> ignores = new ArrayList<String>();
- while ((s = reader.readLine()) != null) {
- ignores.add(s);
- }
- reader.close();
- return ignores;
- }
-
- private static String jobExtToResExt(String fname) {
- int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot);
- }
-
- private static boolean isInList(List<String> onlys, String name) {
- for (String only : onlys)
- if (name.indexOf(only) >= 0)
- return true;
- return false;
- }
-
-}