change ValueWritable.toString
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
index 7898d35..7a7e43d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/LogAlgorithmForPathMergeInputFormat.java
@@ -32,7 +32,7 @@
@SuppressWarnings("rawtypes")
class BinaryLoadGraphReader extends
BinaryVertexReader<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
- private Vertex vertex;
+ private Vertex vertex = null;
private KmerBytesWritable vertexId = null;
private ValueStateWritable vertexValue = new ValueStateWritable();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index ac84d8e..108a2d5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -7,38 +7,49 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.operator.LogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
-public class LogAlgorithmMessageWritable implements WritableComparable<LogAlgorithmMessageWritable>{
+public class LogAlgorithmMessageWritable implements
+ WritableComparable<LogAlgorithmMessageWritable> {
/**
* sourceVertexId stores source vertexId when headVertex sends the message
- * stores neighber vertexValue when pathVertex sends the message
- * chainVertexId stores the chains of connected DNA
- * file stores the point to the file that stores the chains of connected DNA
+ * stores neighber vertexValue when pathVertex sends the message
+ * chainVertexId stores the chains of connected DNA file stores the point to
+ * the file that stores the chains of connected DNA
*/
private VKmerBytesWritable sourceVertexId;
private VKmerBytesWritable chainVertexId;
private byte adjMap;
private int message;
private int sourceVertexState;
-
- public LogAlgorithmMessageWritable(){
- sourceVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId = new VKmerBytesWritable(LogAlgorithmForPathMergeVertex.kmerSize);
+
+ public LogAlgorithmMessageWritable() {
+ if (LogAlgorithmForPathMergeVertex.kmerSize > 0) {
+ sourceVertexId = new VKmerBytesWritable(
+ LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId = new VKmerBytesWritable(
+ LogAlgorithmForPathMergeVertex.kmerSize);
+ } else {
+ sourceVertexId = new VKmerBytesWritable(55);
+ chainVertexId = new VKmerBytesWritable(55);
+ }
}
-
- public void set(VKmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
+
+ public void set(KmerBytesWritable sourceVertexId,
+ KmerBytesWritable chainVertexId, byte adjMap, int message,
+ int sourceVertexState) {
this.sourceVertexId.set(sourceVertexId);
this.chainVertexId.set(chainVertexId);
this.adjMap = adjMap;
this.message = message;
this.sourceVertexState = sourceVertexState;
}
-
- public void reset(){
- //sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- chainVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
- adjMap = (byte)0;
+
+ public void reset() {
+ // sourceVertexId.reset(LogAlgorithmForPathMergeVertex.kmerSize);
+ chainVertexId.reset(0);
+ adjMap = (byte) 0;
message = 0;
sourceVertexState = 0;
}
@@ -47,7 +58,7 @@
return sourceVertexId;
}
- public void setSourceVertexId(VKmerBytesWritable sourceVertexId) {
+ public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
this.sourceVertexId.set(sourceVertexId);
}
@@ -86,7 +97,7 @@
public int getLengthOfChain() {
return chainVertexId.getKmerLength();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
sourceVertexId.write(out);
@@ -105,25 +116,25 @@
sourceVertexState = in.readInt();
}
- @Override
- public int hashCode() {
- return chainVertexId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof NaiveAlgorithmMessageWritable) {
- LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
- return chainVertexId.equals(tp.chainVertexId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return chainVertexId.toString();
- }
-
+ @Override
+ public int hashCode() {
+ return chainVertexId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof LogAlgorithmMessageWritable) {
+ LogAlgorithmMessageWritable tp = (LogAlgorithmMessageWritable) o;
+ return chainVertexId.equals(tp.chainVertexId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return chainVertexId.toString();
+ }
+
@Override
public int compareTo(LogAlgorithmMessageWritable tp) {
return chainVertexId.compareTo(tp.chainVertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 769277c..a10014a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -19,7 +19,11 @@
public ValueStateWritable() {
state = State.NON_VERTEX;
- mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ if (NaiveAlgorithmForPathMergeVertex.kmerSize > 0){
+ mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
+ }else{
+ mergeChain = new VKmerBytesWritable(55);
+ }
}
public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
index fa77d43..7b2f2ca 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/LogAlgorithmForPathMergeVertex.java
@@ -24,13 +24,13 @@
* vertexValue: ValueStateWritable
* edgeValue: NullWritable
* message: LogAlgorithmMessageWritable
- *
+ *
* DNA:
* A: 00
* C: 01
* G: 10
* T: 11
- *
+ *
* succeed node
* A 00000001 1
* G 00000010 2
@@ -42,74 +42,83 @@
* C 01000000 64
* T 10000000 128
*
- * For example, ONE LINE in input file: 00,01,10 0001,0010,
+ * For example, ONE LINE in input file: 00,01,10 0001,0010,
* That means that vertexId is ACG, its succeed node is A and its precursor node is C.
* The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
- * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable.
*/
-public class LogAlgorithmForPathMergeVertex extends Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
-
+public class LogAlgorithmForPathMergeVertex
+ extends
+ Vertex<KmerBytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+
public static final String KMER_SIZE = "LogAlgorithmForPathMergeVertex.kmerSize";
- public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
public static int kmerSize = -1;
+ public static final String ITERATIONS = "NaiveAlgorithmForPathMergeVertex.iteration";
private int maxIteration = -1;
-
- private ValueStateWritable vertexVal = new ValueStateWritable();
-
+
private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
-
- private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(1);
- private VKmerBytesWritable vertexId = new VKmerBytesWritable(1);
- private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
+
+ private VKmerBytesWritableFactory kmerFactory = new VKmerBytesWritableFactory(
+ 1);
+ private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable chainVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable lastKmer = new VKmerBytesWritable(1);
+
/**
* initiate kmerSize, maxIteration
*/
- public void initVertex(){
- if(kmerSize == -1)
+ public void initVertex() {
+ if (kmerSize == -1)
kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
- if (maxIteration < 0)
- maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 100);
- vertexId.set(getVertexId());
- vertexVal = getVertexValue();
+ if (maxIteration < 0)
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS,
+ 100);
}
/**
* get destination vertex
*/
- public VKmerBytesWritable getNextDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+ public VKmerBytesWritable getNextDestVertexId(KmerBytesWritable vertexId,
+ byte geneCode) {
return kmerFactory.shiftKmerWithNextCode(vertexId, geneCode);
}
-
- public VKmerBytesWritable getPreDestVertexId(VKmerBytesWritable vertexId, byte geneCode){
+
+ public VKmerBytesWritable getPreDestVertexId(KmerBytesWritable vertexId,
+ byte geneCode) {
return kmerFactory.shiftKmerWithPreCode(vertexId, geneCode);
}
-
- public VKmerBytesWritable getNextDestVertexIdFromBitmap(VKmerBytesWritable chainVertexId, byte adjMap){
- return getDestVertexIdFromChain(chainVertexId, adjMap);//GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)
+
+ public VKmerBytesWritable getNextDestVertexIdFromBitmap(
+ KmerBytesWritable chainVertexId, byte adjMap) {
+ return getDestVertexIdFromChain(chainVertexId, adjMap);// GeneCode.getGeneCodeFromBitMap((byte)(adjMap
+ // & 0x0F)
}
-
- public VKmerBytesWritable getDestVertexIdFromChain(VKmerBytesWritable chainVertexId, byte adjMap){
+
+ public VKmerBytesWritable getDestVertexIdFromChain(
+ KmerBytesWritable chainVertexId, byte adjMap) {
lastKmer.set(kmerFactory.getLastKmerFromChain(kmerSize, chainVertexId));
- return getNextDestVertexId(lastKmer, GeneCode.getGeneCodeFromBitMap((byte)(adjMap & 0x0F)));
+ return getNextDestVertexId(lastKmer,
+ GeneCode.getGeneCodeFromBitMap((byte) (adjMap & 0x0F)));
}
+
/**
* head send message to all next nodes
*/
- public void sendMsgToAllNextNodes(VKmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if((adjMap & (1 << x)) != 0){
+ public void sendMsgToAllNextNodes(KmerBytesWritable vertexId, byte adjMap) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ if ((adjMap & (1 << x)) != 0) {
destVertexId.set(getNextDestVertexId(vertexId, x));
sendMsg(destVertexId, msg);
}
}
}
+
/**
* head send message to all previous nodes
*/
- public void sendMsgToAllPreviousNodes(VKmerBytesWritable vertexId, byte adjMap){
- for(byte x = GeneCode.A; x<= GeneCode.T ; x++){
- if(((adjMap >> 4) & (1 << x)) != 0){
+ public void sendMsgToAllPreviousNodes(KmerBytesWritable vertexId,
+ byte adjMap) {
+ for (byte x = GeneCode.A; x <= GeneCode.T; x++) {
+ if (((adjMap >> 4) & (1 << x)) != 0) {
destVertexId.set(getPreDestVertexId(vertexId, x));
sendMsg(destVertexId, msg);
}
@@ -119,253 +128,282 @@
/**
* set vertex state
*/
- public void setState(){
- if(msg.getMessage() == Message.START &&
- (vertexVal.getState() == State.MID_VERTEX || vertexVal.getState() == State.END_VERTEX)){
- vertexVal.setState(State.START_VERTEX);
- setVertexValue(vertexVal);
- }
- else if(msg.getMessage() == Message.END && vertexVal.getState() == State.MID_VERTEX){
- vertexVal.setState(State.END_VERTEX);
- setVertexValue(vertexVal);
+ public void setState() {
+ if (msg.getMessage() == Message.START
+ && (getVertexValue().getState() == State.MID_VERTEX || getVertexValue()
+ .getState() == State.END_VERTEX)) {
+ getVertexValue().setState(State.START_VERTEX);
+ setVertexValue(getVertexValue());
+ } else if (msg.getMessage() == Message.END
+ && getVertexValue().getState() == State.MID_VERTEX) {
+ getVertexValue().setState(State.END_VERTEX);
+ setVertexValue(getVertexValue());
voteToHalt();
- }
- else
+ } else
voteToHalt();
}
+
/**
* send start message to next node
*/
- public void sendStartMsgToNextNode(){
+ public void sendStartMsgToNextNode() {
msg.setMessage(Message.START);
- msg.setSourceVertexId(vertexId);
+ msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
voteToHalt();
}
+
/**
* send end message to next node
*/
- public void sendEndMsgToNextNode(){
+ public void sendEndMsgToNextNode() {
msg.setMessage(Message.END);
- msg.setSourceVertexId(vertexId);
+ msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
voteToHalt();
}
+
/**
* send non message to next node
*/
- public void sendNonMsgToNextNode(){
+ public void sendNonMsgToNextNode() {
msg.setMessage(Message.NON);
- msg.setSourceVertexId(vertexId);
+ msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
}
+
/**
* head send message to path
*/
- public void sendMsgToPathVertex(VKmerBytesWritable chainVertexId, byte adjMap){
- if(GeneCode.getGeneCodeFromBitMap((byte)(vertexVal.getAdjMap() & 0x0F)) == -1) //|| lastKmer == null
+ public void sendMsgToPathVertex(KmerBytesWritable chainVertexId,
+ byte adjMap) {
+ if (GeneCode
+ .getGeneCodeFromBitMap((byte) (getVertexValue().getAdjMap() & 0x0F)) == -1) // ||// lastKmer == null
+
voteToHalt();
- else{
- destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId, adjMap));
- if(vertexVal.getState() == State.START_VERTEX){
+ else {
+ destVertexId.set(getNextDestVertexIdFromBitmap(chainVertexId,
+ adjMap));
+ if (getVertexValue().getState() == State.START_VERTEX) {
sendStartMsgToNextNode();
- }
- else if(vertexVal.getState() != State.END_VERTEX && vertexVal.getState() != State.FINAL_DELETE){
+ } else if (getVertexValue().getState() != State.END_VERTEX
+ && getVertexValue().getState() != State.FINAL_DELETE) {
sendEndMsgToNextNode();
}
}
}
+
/**
- * path send message to head
+ * path send message to head
*/
- public void responseMsgToHeadVertex(){
- if(vertexVal.getLengthOfMergeChain() == -1){
- vertexVal.setMergeChain(vertexId);
- setVertexValue(vertexVal);
+ public void responseMsgToHeadVertex() {
+ if (getVertexValue().getLengthOfMergeChain() == -1) {
+ getVertexValue().setMergeChain(getVertexId());
+ setVertexValue(getVertexValue());
}
- msg.set(msg.getSourceVertexId(), vertexVal.getMergeChain(), vertexVal.getAdjMap(), msg.getMessage(), vertexVal.getState());
+ msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(),
+ getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
setMessageType(msg.getMessage());
destVertexId.set(msg.getSourceVertexId());
- sendMsg(destVertexId,msg);
+ sendMsg(destVertexId, msg);
}
+
/**
* set message type
*/
- public void setMessageType(int message){
- //kill Message because it has been merged by the head
- if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ public void setMessageType(int message) {
+ // kill Message because it has been merged by the head
+ if (getVertexValue().getState() == State.END_VERTEX
+ || getVertexValue().getState() == State.FINAL_DELETE) {
msg.setMessage(Message.END);
- vertexVal.setState(State.FINAL_DELETE);
- setVertexValue(vertexVal);
- //deleteVertex(getVertexId());
- }
- else
+ getVertexValue().setState(State.FINAL_DELETE);
+ setVertexValue(getVertexValue());
+ // deleteVertex(getVertexId());
+ } else
msg.setMessage(Message.NON);
-
- if(message == Message.START){
- vertexVal.setState(State.TODELETE);
- setVertexValue(vertexVal);
+
+ if (message == Message.START) {
+ getVertexValue().setState(State.TODELETE);
+ setVertexValue(getVertexValue());
}
}
+
/**
- * set vertexValue's state chainVertexId, value
+ * set vertexValue's state chainVertexId, value
*/
- public void setVertexValueAttributes(){
- if(msg.getMessage() == Message.END){
- if(vertexVal.getState() != State.START_VERTEX)
- vertexVal.setState(State.END_VERTEX);
+ public void setVertexValueAttributes() {
+ if (msg.getMessage() == Message.END) {
+ if (getVertexValue().getState() != State.START_VERTEX)
+ getVertexValue().setState(State.END_VERTEX);
else
- vertexVal.setState(State.FINAL_VERTEX);
+ getVertexValue().setState(State.FINAL_VERTEX);
}
-
- if(getSuperstep() == 5)
- chainVertexId.set(vertexId);
+
+ if (getSuperstep() == 5)
+ chainVertexId.set(getVertexId());
else
- chainVertexId.set(vertexVal.getMergeChain());
- lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain() - kmerSize + 1, msg.getChainVertexId()));
+ chainVertexId.set(getVertexValue().getMergeChain());
+ lastKmer.set(kmerFactory.getLastKmerFromChain(msg.getLengthOfChain()
+ - kmerSize + 1, msg.getChainVertexId()));
chainVertexId.set(kmerFactory.mergeTwoKmer(chainVertexId, lastKmer));
- vertexVal.setMergeChain(chainVertexId);
-
- byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getAdjMap(), msg.getAdjMap());
- vertexVal.setAdjMap(tmpVertexValue);
+ getVertexValue().setMergeChain(chainVertexId);
+
+ byte tmpVertexValue = GraphVertexOperation.updateRightNeighber(
+ getVertexValue().getAdjMap(), msg.getAdjMap());
+ getVertexValue().setAdjMap(tmpVertexValue);
}
+
/**
- * send message to self
+ * send message to self
*/
- public void sendMsgToSelf(){
- if(msg.getMessage() != Message.END){
- setVertexValue(vertexVal);
- msg.reset(); //reset
- msg.setAdjMap(vertexVal.getAdjMap());
- sendMsg(vertexId,msg);
+ public void sendMsgToSelf() {
+ if (msg.getMessage() != Message.END) {
+ setVertexValue(getVertexValue());
+ msg.reset(); // reset
+ msg.setAdjMap(getVertexValue().getAdjMap());
+ sendMsg(getVertexId(), msg);
}
}
+
/**
* start sending message
*/
- public void startSendMsg(){
- if(GraphVertexOperation.isHeadVertex(vertexVal.getAdjMap())){
- msg.set(vertexId, chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
- sendMsgToAllNextNodes(vertexId, vertexVal.getAdjMap());
+ public void startSendMsg() {
+ if (GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())) {
+ msg.set(getVertexId(), chainVertexId, (byte) 0, Message.START,
+ State.NON_VERTEX); // msg.set(null, (byte)0, chainVertexId,
+ // Message.START, State.NON_VERTEX);
+ sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
- if(GraphVertexOperation.isRearVertex(vertexVal.getAdjMap())){
- msg.set(vertexId, chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
- sendMsgToAllPreviousNodes(vertexId, vertexVal.getAdjMap());
+ if (GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())) {
+ msg.set(getVertexId(), chainVertexId, (byte) 0, Message.END,
+ State.NON_VERTEX);
+ sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
- if(GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
- vertexVal.setState(State.MID_VERTEX);
- setVertexValue(vertexVal);
+ if (GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())) {
+ getVertexValue().setState(State.MID_VERTEX);
+ setVertexValue(getVertexValue());
}
}
+
/**
- * initiate head, rear and path node
+ * initiate head, rear and path node
*/
- public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator){
- while(msgIterator.hasNext()){
- if(!GraphVertexOperation.isPathVertex(vertexVal.getAdjMap())){
+ public void initState(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ while (msgIterator.hasNext()) {
+ if (!GraphVertexOperation.isPathVertex(getVertexValue().getAdjMap())) {
msgIterator.next();
voteToHalt();
- }
- else{
+ } else {
msg = msgIterator.next();
setState();
}
}
}
+
/**
* head send message to path
*/
- public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(getSuperstep() == 3){
+ public void sendMsgToPathVertex(
+ Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (getSuperstep() == 3) {
msg.reset();
- sendMsgToPathVertex(vertexId, vertexVal.getAdjMap());
- }
- else{
- if(msgIterator.hasNext()){
+ sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
+ } else {
+ if (msgIterator.hasNext()) {
msg = msgIterator.next();
- sendMsgToPathVertex(vertexVal.getMergeChain(), msg.getAdjMap());
+ sendMsgToPathVertex(getVertexValue().getMergeChain(), msg.getAdjMap());
}
}
}
+
/**
* path response message to head
*/
- public void responseMsgToHeadVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
+ public void responseMsgToHeadVertex(
+ Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (msgIterator.hasNext()) {
msg = msgIterator.next();
responseMsgToHeadVertex();
- //voteToHalt();
- }
- else{
- if(getVertexValue().getState() != State.START_VERTEX
- && getVertexValue().getState() != State.END_VERTEX && getVertexValue().getState() != State.FINAL_DELETE){
- //vertexVal.setState(State.KILL_SELF);
- //setVertexValue(vertexVal);
- //voteToHalt();
- deleteVertex(getVertexId());//killSelf because it doesn't receive any message
+ // voteToHalt();
+ } else {
+ if (getVertexValue().getState() != State.START_VERTEX
+ && getVertexValue().getState() != State.END_VERTEX
+ && getVertexValue().getState() != State.FINAL_DELETE) {
+ // vertexVal.setState(State.KILL_SELF);
+ // setVertexValue(vertexVal);
+ // voteToHalt();
+ deleteVertex(getVertexId());// killSelf because it doesn't
+ // receive any message
}
}
}
+
/**
* merge chainVertex and store in vertexVal.chainVertexId
*/
- public void mergeChainVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
- if(msgIterator.hasNext()){
+ public void mergeChainVertex(
+ Iterator<LogAlgorithmMessageWritable> msgIterator) {
+ if (msgIterator.hasNext()) {
msg = msgIterator.next();
setVertexValueAttributes();
sendMsgToSelf();
}
- if(vertexVal.getState() == State.END_VERTEX || vertexVal.getState() == State.FINAL_DELETE){
+ if (getVertexValue().getState() == State.END_VERTEX
+ || getVertexValue().getState() == State.FINAL_DELETE) {
voteToHalt();
}
- if(vertexVal.getState() == State.FINAL_VERTEX){
+ if (getVertexValue().getState() == State.FINAL_VERTEX) {
//String source = vertexVal.getMergeChain().toString();
voteToHalt();
}
}
+
@Override
public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
initVertex();
- if(vertexVal.getState() != State.NON_EXIST && vertexVal.getState() != State.KILL_SELF){
- if (getSuperstep() == 1)
+ if (getVertexValue().getState() != State.NON_EXIST
+ && getVertexValue().getState() != State.KILL_SELF) {
+ if (getSuperstep() == 1)
startSendMsg();
- else if(getSuperstep() == 2)
+ else if (getSuperstep() == 2)
initState(msgIterator);
- else if(getSuperstep()%3 == 0 && getSuperstep() <= maxIteration){
+ else if (getSuperstep() % 3 == 0 && getSuperstep() <= maxIteration) {
sendMsgToPathVertex(msgIterator);
- }
- else if(getSuperstep()%3 == 1 && getSuperstep() <= maxIteration){
+ } else if (getSuperstep() % 3 == 1
+ && getSuperstep() <= maxIteration) {
responseMsgToHeadVertex(msgIterator);
- }
- else if(getSuperstep()%3 == 2 && getSuperstep() <= maxIteration){
- if(vertexVal.getState() == State.TODELETE){ //|| vertexVal.getState() == State.KILL_SELF)
- //vertexVal.setState(State.NON_EXIST);
- //setVertexValue(vertexVal);
- //voteToHalt();
- deleteVertex(getVertexId()); //killSelf
- }
- else{
+ } else if (getSuperstep() % 3 == 2
+ && getSuperstep() <= maxIteration) {
+ if (getVertexValue().getState() == State.TODELETE) { // || vertexVal.getState() == State.KILL_SELF)
+ // vertexVal.setState(State.NON_EXIST);
+ // setVertexValue(vertexVal);
+ // voteToHalt();
+ deleteVertex(getVertexId()); // killSelf
+ } else {
mergeChainVertex(msgIterator);
}
}
}
}
+
/**
* @param args
*/
public static void main(String[] args) throws Exception {
- PregelixJob job = new PregelixJob(LogAlgorithmForPathMergeVertex.class.getSimpleName());
- job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
- /**
- * BinaryInput and BinaryOutput~/
- */
- job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
- job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
- job.setOutputKeyClass(KmerBytesWritable.class);
- job.setOutputValueClass(ValueStateWritable.class);
- job.setDynamicVertexValueSize(true);
- Client.run(args, job);
+ PregelixJob job = new PregelixJob(
+ LogAlgorithmForPathMergeVertex.class.getSimpleName());
+ job.setVertexClass(LogAlgorithmForPathMergeVertex.class);
+ /**
+ * BinaryInput and BinaryOutput~/
+ */
+ job.setVertexInputFormatClass(LogAlgorithmForPathMergeInputFormat.class);
+ job.setVertexOutputFormatClass(LogAlgorithmForPathMergeOutputFormat.class);
+ job.setOutputKeyClass(KmerBytesWritable.class);
+ job.setOutputValueClass(ValueStateWritable.class);
+ job.setDynamicVertexValueSize(true);
+ Client.run(args, job);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
index d97a2fd..1f90dbd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateSmallFile.java
@@ -40,9 +40,9 @@
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Path dir = new Path("data");
- Path inFile = new Path(dir, "part-0");
- Path outFile = new Path(dir, "part-0-out-5000000");
- generateNumOfLinesFromBigFile(inFile,outFile,5000000);
+ Path inFile = new Path(dir, "part-1");
+ Path outFile = new Path(dir, "part-1-out-3000000");
+ generateNumOfLinesFromBigFile(inFile,outFile,3000000);
}
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
index c759261..8d06e8e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/sequencefile/GenerateTextFile.java
@@ -11,10 +11,11 @@
import edu.uci.ics.genomix.pregelix.io.ValueStateWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
public class GenerateTextFile {
- public static void generate() throws IOException{
+ public static void generateFromPathmergeResult() throws IOException{
BufferedWriter bw = new BufferedWriter(new FileWriter("text/log_TreePath"));
Configuration conf = new Configuration();
FileSystem fileSys = FileSystem.get(conf);
@@ -36,13 +37,32 @@
}
bw.close();
}
+ public static void generateFromGraphbuildResult() throws IOException{
+ BufferedWriter bw = new BufferedWriter(new FileWriter("textfile"));
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.get(conf);
+ Path path = new Path("data/input/part-0-out-3000000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, path, conf);
+ KmerBytesWritable key = new KmerBytesWritable(55);
+ KmerCountValue value = new KmerCountValue();
+
+ while(reader.next(key, value)){
+ if (key == null || value == null){
+ break;
+ }
+ bw.write(key.toString());
+ bw.newLine();
+ }
+ reader.close();
+ bw.close();
+ }
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
- // TODO Auto-generated method stub
- generate();
+ generateFromPathmergeResult();
+ generateFromGraphbuildResult();
}
}