delete test code
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
index 29c5ccb..8075238 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeOutputFormat.java
@@ -39,10 +39,7 @@
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
if(vertex.getVertexValue().getState() != State.FINAL_DELETE
- && vertex.getVertexValue().getState() != State.END_VERTEX
- && vertex.getVertexValue().getState() != State.TODELETE
- && vertex.getVertexValue().getState() != State.KILL_SELF
- && vertex.getVertexValue().getState() != State.NON_EXIST)
+ && vertex.getVertexValue().getState() != State.END_VERTEX)
getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
index 482014e..983112b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/NaiveAlgorithmForPathMergeOutputFormat.java
@@ -8,7 +8,6 @@
import edu.uci.ics.genomix.pregelix.api.io.binary.BinaryVertexOutputFormat;
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
-import edu.uci.ics.genomix.pregelix.type.State;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -36,8 +35,7 @@
@Override
public void writeVertex(Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
InterruptedException {
- if(vertex.getVertexValue().getState() != State.NON_EXIST)
- getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
+ getRecordWriter().write(vertex.getVertexId(),vertex.getVertexValue());
}
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index 108a2d5..739bacb 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -10,51 +10,41 @@
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
-public class LogAlgorithmMessageWritable implements
- WritableComparable<LogAlgorithmMessageWritable> {
+public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
/**
* sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA file stores the point to
- * the file that stores the chains of connected DNA
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA
+ * file stores the point to the file that stores the chains of connected DNA
*/
- private VKmerBytesWritable sourceVertexId;
+ private KmerBytesWritable sourceVertexId;
private VKmerBytesWritable chainVertexId;
private byte adjMap;
private int message;
private int sourceVertexState;
-
- public LogAlgorithmMessageWritable() {
- if (LogAlgorithmForPathMergeVertex.kmerSize > 0) {
- sourceVertexId = new VKmerBytesWritable(
- LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(
- LogAlgorithmForPathMergeVertex.kmerSize);
- } else {
- sourceVertexId = new VKmerBytesWritable(55);
- chainVertexId = new VKmerBytesWritable(55);
- }
+
+ public LogAlgorithmMessageWritable(){
+ sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
}
-
- public void set(KmerBytesWritable sourceVertexId,
- KmerBytesWritable chainVertexId, byte adjMap, int message,
- int sourceVertexState) {
+
+ public void set(KmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
this.sourceVertexId.set(sourceVertexId);
this.chainVertexId.set(chainVertexId);
this.adjMap = adjMap;
this.message = message;
this.sourceVertexState = sourceVertexState;
}
-
- public void reset() {
- // sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId.reset(0);
- adjMap = (byte) 0;
+
+ public void reset(){
+ //sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ adjMap = (byte)0;
message = 0;
sourceVertexState = 0;
}
- public VKmerBytesWritable getSourceVertexId() {
+ public KmerBytesWritable getSourceVertexId() {
return sourceVertexId;
}
@@ -97,7 +87,7 @@
public int getLengthOfChain() {
return chainVertexId.getKmerLength();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
sourceVertexId.write(out);
@@ -116,25 +106,25 @@
sourceVertexState = in.readInt();
}
- @Override
- public int hashCode() {
- return chainVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof LogAlgorithmMessageWritable) {
- LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId.equals(tp.chainVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return chainVertexId.toString();
- }
-
+ @Override
+ public int hashCode() {
+ return chainVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof NaiveAlgorithmMessageWritable) {
+ LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
+ return chainVertexId.equals(tp.chainVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return chainVertexId.toString();
+ }
+
@Override
public int compareTo(LogAlgorithmMessageWritable tp) {
return chainVertexId.compareTo(tp.chainVertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index a10014a..769277c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -19,11 +19,7 @@
public ValueStateWritable() {
state = State.NON_VERTEX;
- if (NaiveAlgorithmForPathMergeVertex.kmerSize > 0){
- mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
- }else{
- mergeChain = new VKmerBytesWritable(55);
- }
+ mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
}
public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index 7b2f2ca..398d7ff 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -24,13 +24,13 @@
* vertexValue: ValueStateWritable
* edgeValue: NullWritable
* message: LogAlgorithmMessageWritable
- *
+ *
* DNA:
* A: 00
* C: 01
* G: 10
* T: 11
- *
+ *
* succeed node
* A 00000001 1
* G 00000010 2
@@ -42,83 +42,69 @@
* C 01000000 64
* T 10000000 128
*
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
* That means that vertexId is ACG, its succeed node is A and its precursor node is C.
* The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
-public class LogAlgorithmForPathMergeVertex
- extends
- Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
-
+public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
+ public static final String ITERATIONS = "LogAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
private int maxIteration = -1;
-
+
private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(
- 1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
-
/**
* initiate kmerSize, maxIteration
*/
- public void initVertex() {
- if (kmerSize == -1)
+ public void initVertex(){
+ if(kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS,
- 100);
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
}
/**
* get destination vertex
*/
- public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId,
- byte geneCode) {
+ public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId, byte geneCode){
return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
}
-
- public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId,
- byte geneCode) {
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId, byte geneCode){
return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
}
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(
- KmerBytesWritable chainVertexId, byte adjMap) {
- return getDestVertexIdFromChain(chainVertexId, adjMap);// GeneCode.getGeneCodeFromBitMap((byte)(adjMap
- // & 0x0F)
- }
-
- public VKmerBytesWritable getDestVertexIdFromChain(
- KmerBytesWritable chainVertexId, byte adjMap) {
- lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getNextDestVertexId(lastKmer,
- GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(KmerBytesWritable chainVertexId, byte adjMap){
+ return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
}
+ public VKmerBytesWritable getDestVertexIdFromChain(KmerBytesWritable chainVertexId, byte adjMap){
+ lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
+ return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ }
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if ((adjMap & (1 << x)) != 0) {
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if((adjMap & (1 << x)) != 0){
destVertexId.set(getNextDestVertexId(vertexId, x));
sendMsg(destVertexId, msg);
}
}
}
-
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId,
- byte adjMap) {
- for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
- if (((adjMap >> 4) & (1 << x)) != 0) {
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId, byte adjMap){
+ for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
+ if(((adjMap >> 4) & (1 << x)) != 0){
destVertexId.set(getPreDestVertexId(vertexId, x));
sendMsg(destVertexId, msg);
}
@@ -128,282 +114,245 @@
/**
* set vertex state
*/
- public void setState() {
- if (msg.getMessage() == Message.START
- && (getVertexValue().getState() == State.MID_VERTEX || getVertexValue()
- .getState() == State.END_VERTEX)) {
+ public void setState(){
+ if(msg.getMessage() == Message.START &&
+ (getVertexValue().getState() == State.MID_VERTEX || getVertexValue().getState() == State.END_VERTEX)){
getVertexValue().setState(State.START_VERTEX);
setVertexValue(getVertexValue());
- } else if (msg.getMessage() == Message.END
- && getVertexValue().getState() == State.MID_VERTEX) {
+ }
+ else if(msg.getMessage() == Message.END && getVertexValue().getState() == State.MID_VERTEX){
getVertexValue().setState(State.END_VERTEX);
setVertexValue(getVertexValue());
voteToHalt();
- } else
+ }
+ else
voteToHalt();
}
-
/**
* send start message to next node
*/
- public void sendStartMsgToNextNode() {
+ public void sendStartMsgToNextNode(){
msg.setMessage(Message.START);
msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
voteToHalt();
}
-
/**
* send end message to next node
*/
- public void sendEndMsgToNextNode() {
+ public void sendEndMsgToNextNode(){
msg.setMessage(Message.END);
msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
voteToHalt();
}
-
/**
* send non message to next node
*/
- public void sendNonMsgToNextNode() {
+ public void sendNonMsgToNextNode(){
msg.setMessage(Message.NON);
msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
}
-
/**
* head send message to path
*/
- public void sendMsgToPathVertex(KmerBytesWritable chainVertexId,
- byte adjMap) {
- if (GeneCode
- .getGeneCodeFromBitMap((byte) (getVertexValue().getAdjMap() & 0x0F)) == -1) // ||// lastKmer == null
-
+ public void sendMsgToPathVertex(KmerBytesWritable chainVertexId, byte adjMap){
+ if(GeneCode.getGeneCodeFromBitMap((byte)(getVertexValue().getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
voteToHalt();
- else {
- destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId,
- adjMap));
- if (getVertexValue().getState() == State.START_VERTEX) {
+ else{
+ destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
+ if(getVertexValue().getState() == State.START_VERTEX){
sendStartMsgToNextNode();
- } else if (getVertexValue().getState() != State.END_VERTEX
- && getVertexValue().getState() != State.FINAL_DELETE) {
+ }
+ else if(getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
sendEndMsgToNextNode();
}
}
}
-
/**
- * path send message to head
+ * path send message to head
*/
- public void responseMsgToHeadVertex() {
- if (getVertexValue().getLengthOfMergeChain() == -1) {
+ public void responseMsgToHeadVertex(){
+ if(getVertexValue().getLengthOfMergeChain() == -1){
getVertexValue().setMergeChain(getVertexId());
setVertexValue(getVertexValue());
}
- msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(),
- getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
+ msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
setMessageType(msg.getMessage());
destVertexId.set(msg.getSourceVertexId());
- sendMsg(destVertexId, msg);
+ sendMsg(destVertexId,msg);
}
-
/**
* set message type
*/
- public void setMessageType(int message) {
- // kill Message because it has been merged by the head
- if (getVertexValue().getState() == State.END_VERTEX
- || getVertexValue().getState() == State.FINAL_DELETE) {
+ public void setMessageType(int message){
+ //kill Message because it has been merged by the head
+ if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
msg.setMessage(Message.END);
getVertexValue().setState(State.FINAL_DELETE);
setVertexValue(getVertexValue());
- // deleteVertex(getVertexId());
- } else
+ //deleteVertex(getVertexId());
+ }
+ else
msg.setMessage(Message.NON);
-
- if (message == Message.START) {
+
+ if(message == Message.START){
getVertexValue().setState(State.TODELETE);
setVertexValue(getVertexValue());
}
}
-
/**
- * set vertexValue's state chainVertexId, value
+ * set vertexValue's state chainVertexId, value
*/
- public void setVertexValueAttributes() {
- if (msg.getMessage() == Message.END) {
- if (getVertexValue().getState() != State.START_VERTEX)
+ public void setVertexValueAttributes(){
+ if(msg.getMessage() == Message.END){
+ if(getVertexValue().getState() != State.START_VERTEX)
getVertexValue().setState(State.END_VERTEX);
else
getVertexValue().setState(State.FINAL_VERTEX);
}
-
- if (getSuperstep() == 5)
+
+ if(getSuperstep() == 5)
chainVertexId.set(getVertexId());
else
chainVertexId.set(getVertexValue().getMergeChain());
- lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain()
- - kmerSize + 1, msg.getChainVertexId()));
+ lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
getVertexValue().setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(
- getVertexValue().getAdjMap(), msg.getAdjMap());
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
getVertexValue().setAdjMap(tmpVertexValue);
}
-
/**
- * send message to self
+ * send message to self
*/
- public void sendMsgToSelf() {
- if (msg.getMessage() != Message.END) {
+ public void sendMsgToSelf(){
+ if(msg.getMessage() != Message.END){
setVertexValue(getVertexValue());
- msg.reset(); // reset
+ msg.reset(); //reset
msg.setAdjMap(getVertexValue().getAdjMap());
- sendMsg(getVertexId(), msg);
+ sendMsg(getVertexId(),msg);
}
}
-
/**
* start sending message
*/
- public void startSendMsg() {
- if (GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())) {
- msg.set(getVertexId(), chainVertexId, (byte) 0, Message.START,
- State.NON_VERTEX); // msg.set(null, (byte)0, chainVertexId,
- // Message.START, State.NON_VERTEX);
+ public void startSendMsg(){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
+ msg.set(getVertexId(), chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
- if (GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())) {
- msg.set(getVertexId(), chainVertexId, (byte) 0, Message.END,
- State.NON_VERTEX);
+ if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
+ msg.set(getVertexId(), chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
- if (GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())) {
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
getVertexValue().setState(State.MID_VERTEX);
setVertexValue(getVertexValue());
}
}
-
/**
- * initiate head, rear and path node
+ * initiate head, rear and path node
*/
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator) {
- while (msgIterator.hasNext()) {
- if (!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())) {
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ while(msgIterator.hasNext()){
+ if(!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
msgIterator.next();
voteToHalt();
- } else {
+ }
+ else{
msg = msgIterator.next();
setState();
}
}
}
-
/**
* head send message to path
*/
- public void sendMsgToPathVertex(
- Iterator<LogAlgorithmMessageWritable> msgIterator) {
- if (getSuperstep() == 3) {
+ public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(getSuperstep() == 3){
msg.reset();
sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
- } else {
- if (msgIterator.hasNext()) {
+ }
+ else{
+ if(msgIterator.hasNext()){
msg = msgIterator.next();
sendMsgToPathVertex(getVertexValue().getMergeChain(), msg.getAdjMap());
}
}
}
-
/**
* path response message to head
*/
- public void responseMsgToHeadVertex(
- Iterator<LogAlgorithmMessageWritable> msgIterator) {
- if (msgIterator.hasNext()) {
+ public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
msg = msgIterator.next();
responseMsgToHeadVertex();
- // voteToHalt();
- } else {
- if (getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX
- && getVertexValue().getState() != State.FINAL_DELETE) {
- // vertexVal.setState(State.KILL_SELF);
- // setVertexValue(vertexVal);
- // voteToHalt();
- deleteVertex(getVertexId());// killSelf because it doesn't
- // receive any message
+ //voteToHalt();
+ }
+ else{
+ if(getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
+ deleteVertex(getVertexId());//killSelf because it doesn't receive any message
}
}
}
-
/**
* merge chainVertex and store in vertexVal.chainVertexId
*/
- public void mergeChainVertex(
- Iterator<LogAlgorithmMessageWritable> msgIterator) {
- if (msgIterator.hasNext()) {
+ public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
+ if(msgIterator.hasNext()){
msg = msgIterator.next();
setVertexValueAttributes();
sendMsgToSelf();
}
- if (getVertexValue().getState() == State.END_VERTEX
- || getVertexValue().getState() == State.FINAL_DELETE) {
+ if(getVertexValue().getState() == State.END_VERTEX || getVertexValue().getState() == State.FINAL_DELETE){
voteToHalt();
}
- if (getVertexValue().getState() == State.FINAL_VERTEX) {
- //String source = vertexVal.getMergeChain().toString();
+ if(getVertexValue().getState() == State.FINAL_VERTEX){
+ //String source = getVertexValue().getMergeChain().toString();
voteToHalt();
}
}
-
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
initVertex();
- if (getVertexValue().getState() != State.NON_EXIST
- && getVertexValue().getState() != State.KILL_SELF) {
- if (getSuperstep() == 1)
- startSendMsg();
- else if (getSuperstep() == 2)
- initState(msgIterator);
- else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
- sendMsgToPathVertex(msgIterator);
- } else if (getSuperstep() % 3 == 1
- && getSuperstep() <= maxIteration) {
- responseMsgToHeadVertex(msgIterator);
- } else if (getSuperstep() % 3 == 2
- && getSuperstep() <= maxIteration) {
- if (getVertexValue().getState() == State.TODELETE) { // || vertexVal.getState() == State.KILL_SELF)
- // vertexVal.setState(State.NON_EXIST);
- // setVertexValue(vertexVal);
- // voteToHalt();
- deleteVertex(getVertexId()); // killSelf
- } else {
- mergeChainVertex(msgIterator);
- }
+ if (getSuperstep() == 1)
+ startSendMsg();
+ else if(getSuperstep() == 2)
+ initState(msgIterator);
+ else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
+ sendMsgToPathVertex(msgIterator);
+ }
+ else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
+ responseMsgToHeadVertex(msgIterator);
+ }
+ else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
+ if(getVertexValue().getState() == State.TODELETE){
+ deleteVertex(getVertexId()); //killSelf
+ }
+ else{
+ mergeChainVertex(msgIterator);
}
}
}
-
/**
* @param args
*/
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(
- LogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
+ PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
index 6941865..04b8525 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/NaiveAlgorithmForPathMergeVertex.java
@@ -56,8 +56,6 @@
public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
private int maxIteration = -1;
-
- private ValueStateWritable vertexVal = new ValueStateWritable();
private NaiveAlgorithmMessageWritable msg = new NaiveAlgorithmMessageWritable();
@@ -74,7 +72,6 @@
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
if (maxIteration < 0)
maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- vertexVal = getVertexValue();
}
public void findDestination(){
destVertexId.set(msg.getSourceVertexId());
@@ -107,11 +104,11 @@
public void initChainVertex(){
if(!msg.isRear()){
findDestination();
- if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
chainVertexId.set(getVertexId());
- msg.set(getVertexId(), chainVertexId, getVertexId(), vertexVal.getAdjMap(), false);
+ msg.set(getVertexId(), chainVertexId, getVertexId(), getVertexValue().getAdjMap(), false);
sendMsg(destVertexId,msg);
- }else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap()))
+ }else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap()))
voteToHalt();
}
}
@@ -133,22 +130,22 @@
public void responseMsgToHeadVertex(){
if(!msg.isRear()){
findDestination();
- if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ if(GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())){
chainVertexId = kmerFactory.mergeKmerWithNextCode(msg.getChainVertexId(),
getVertexId().getGeneCodeAtPosition(kmerSize - 1));
deleteVertex(getVertexId());
- msg.set(getVertexId(), chainVertexId, msg.getHeadVertexId(), vertexVal.getAdjMap(), false);
+ msg.set(getVertexId(), chainVertexId, msg.getHeadVertexId(), getVertexValue().getAdjMap(), false);
sendMsg(destVertexId,msg);
}
- else if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
+ else if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
msg.set(getVertexId(), msg.getChainVertexId(), msg.getHeadVertexId(), (byte)0, true);
sendMsg(destVertexId,msg);
}
}else{// is Rear
chainVertexId.set(msg.getSourceVertexId());
- vertexVal.set(GraphVertexOperation.updateRightNeighberByVertexId(vertexVal.getAdjMap(), chainVertexId, kmerSize),
+ getVertexValue().set(GraphVertexOperation.updateRightNeighberByVertexId(getVertexValue().getAdjMap(), chainVertexId, kmerSize),
State.START_VERTEX, msg.getChainVertexId());
- setVertexValue(vertexVal);
+ setVertexValue(getVertexValue());
//String source = msg.getChainVertexId().toString();
//System.out.print("");
}
@@ -158,9 +155,9 @@
public void compute(Iterator<NaiveAlgorithmMessageWritable> msgIterator) {
initVertex();
if (getSuperstep() == 1) {
- if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
+ if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
msg.set(getVertexId(), chainVertexId, getVertexId(), (byte)0, false);
- sendMsgToAllNextNodes(getVertexId(), vertexVal.getAdjMap());
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
}
}
else if(getSuperstep() == 2){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index 1f90dbd..d97a2fd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -40,9 +40,9 @@
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Path dir = new Path("data");
- Path inFile = new Path(dir, "part-1");
- Path outFile = new Path(dir, "part-1-out-3000000");
- generateNumOfLinesFromBigFile(inFile,outFile,3000000);
+ Path inFile = new Path(dir, "part-0");
+ Path outFile = new Path(dir, "part-0-out-5000000");
+ generateNumOfLinesFromBigFile(inFile,outFile,5000000);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index ae3c621..61bcb0c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -9,7 +9,6 @@
public static final int FINAL_VERTEX = 5;
public static final int FINAL_DELETE = 6;
public static final int KILL_SELF = 7;
- public static final int NON_EXIST = 8;
public final static class STATE_CONTENT{
@@ -40,9 +39,6 @@
case KILL_SELF:
r = "KILL_SELF";
break;
- case NON_EXIST:
- r = "NON_EXIST";
- break;
}
return r;
}