reduce some duplicate message communication and change int to byte
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
index 2e793da..c2d3f1c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/LogAlgorithmMessageWritable.java
@@ -7,6 +7,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.genomix.pregelix.operator.ThreeStepLogAlgorithmForPathMergeVertex;
+import edu.uci.ics.genomix.pregelix.type.CheckMessage;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -20,28 +21,39 @@
private KmerBytesWritable sourceVertexId;
private VKmerBytesWritable chainVertexId;
private byte adjMap;
- private int message;
- private int sourceVertexState;
+ private byte message;
+
+ private byte checkMessage;
public LogAlgorithmMessageWritable(){
sourceVertexId = new VKmerBytesWritable(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
chainVertexId = new VKmerBytesWritable(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
+ checkMessage = 0;
}
- public void set(KmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, int message, int sourceVertexState){
- this.sourceVertexId.set(sourceVertexId);
- this.chainVertexId.set(chainVertexId);
- this.adjMap = adjMap;
+ public void set(KmerBytesWritable sourceVertexId, VKmerBytesWritable chainVertexId, byte adjMap, byte message){
+ checkMessage = 0;
+ if(sourceVertexId != null){
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId);
+ }
+ if(chainVertexId != null){
+ checkMessage |= CheckMessage.CHAIN;
+ this.chainVertexId.set(chainVertexId);
+ }
+ if(adjMap != 0){
+ checkMessage |= CheckMessage.ADJMAP;
+ this.adjMap = adjMap;
+ }
this.message = message;
- this.sourceVertexState = sourceVertexState;
}
public void reset(){
- //sourceVertexId.reset(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
+ checkMessage = 0;
chainVertexId.reset(ThreeStepLogAlgorithmForPathMergeVertex.kmerSize);
adjMap = (byte)0;
message = 0;
- sourceVertexState = 0;
+ //sourceVertexState = 0;
}
public KmerBytesWritable getSourceVertexId() {
@@ -49,7 +61,10 @@
}
public void setSourceVertexId(KmerBytesWritable sourceVertexId) {
- this.sourceVertexId.set(sourceVertexId);
+ if(sourceVertexId != null){
+ checkMessage |= CheckMessage.SOURCE;
+ this.sourceVertexId.set(sourceVertexId);
+ }
}
public byte getAdjMap() {
@@ -68,42 +83,40 @@
this.chainVertexId.set(chainVertexId);
}
- public int getMessage() {
+ public byte getMessage() {
return message;
}
- public void setMessage(int message) {
+ public void setMessage(byte message) {
this.message = message;
}
- public int getSourceVertexState() {
- return sourceVertexState;
- }
-
- public void setSourceVertexState(int sourceVertexState) {
- this.sourceVertexState = sourceVertexState;
- }
-
public int getLengthOfChain() {
return chainVertexId.getKmerLength();
}
@Override
public void write(DataOutput out) throws IOException {
- sourceVertexId.write(out);
- chainVertexId.write(out);
- out.write(adjMap);
- out.writeInt(message);
- out.writeInt(sourceVertexState);
+ out.writeByte(checkMessage);
+ if((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.write(out);
+ if((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.write(out);
+ if((checkMessage & CheckMessage.ADJMAP) != 0)
+ out.write(adjMap);
+ out.writeByte(message);
}
@Override
public void readFields(DataInput in) throws IOException {
- sourceVertexId.readFields(in);
- chainVertexId.readFields(in);
- adjMap = in.readByte();
- message = in.readInt();
- sourceVertexState = in.readInt();
+ checkMessage = in.readByte();
+ if((checkMessage & CheckMessage.SOURCE) != 0)
+ sourceVertexId.readFields(in);
+ if((checkMessage & CheckMessage.CHAIN) != 0)
+ chainVertexId.readFields(in);
+ if((checkMessage & CheckMessage.ADJMAP) != 0)
+ adjMap = in.readByte();
+ message = in.readByte();
}
@Override
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
index 392acab..8931dcc 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/ValueStateWritable.java
@@ -14,7 +14,7 @@
public class ValueStateWritable implements WritableComparable<ValueStateWritable> {
private byte adjMap;
- private int state;
+ private byte state;
private VKmerBytesWritable mergeChain;
public ValueStateWritable() {
@@ -22,13 +22,13 @@
mergeChain = new VKmerBytesWritable(NaiveAlgorithmForPathMergeVertex.kmerSize);
}
- public ValueStateWritable(byte adjMap, int state, VKmerBytesWritable mergeChain) {
+ public ValueStateWritable(byte adjMap, byte state, VKmerBytesWritable mergeChain) {
this.adjMap = adjMap;
this.state = state;
this.mergeChain.set(mergeChain);
}
- public void set(byte adjMap, int state, VKmerBytesWritable mergeChain){
+ public void set(byte adjMap, byte state, VKmerBytesWritable mergeChain){
this.adjMap = adjMap;
this.state = state;
this.mergeChain.set(mergeChain);
@@ -42,11 +42,11 @@
this.adjMap = adjMap;
}
- public int getState() {
+ public byte getState() {
return state;
}
- public void setState(int state) {
+ public void setState(byte state) {
this.state = state;
}
@@ -69,14 +69,14 @@
@Override
public void readFields(DataInput in) throws IOException {
adjMap = in.readByte();
- state = in.readInt();
+ state = in.readByte();
mergeChain.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(adjMap);
- out.writeInt(state);
+ out.writeByte(state);
mergeChain.write(out);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
index d4f03ee..a615334 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/log/LogAlgorithmLogFormatter.java
@@ -16,7 +16,7 @@
private VKmerBytesWritable sourceVertexId = new VKmerBytesWritable(1);
private VKmerBytesWritable destVertexId = new VKmerBytesWritable(1);
private LogAlgorithmMessageWritable msg = new LogAlgorithmMessageWritable();
- private int state;
+ private byte state;
private VKmerBytesWritable mergeChain = new VKmerBytesWritable(1);;
//private boolean testDelete = false;
/** 0: general operation
@@ -30,7 +30,7 @@
}
public void set(long step, VKmerBytesWritable sourceVertexId,
- VKmerBytesWritable destVertexId, LogAlgorithmMessageWritable msg, int state){
+ VKmerBytesWritable destVertexId, LogAlgorithmMessageWritable msg, byte state){
this.step = step;
this.sourceVertexId.set(sourceVertexId);
this.destVertexId.set(destVertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
index 1a1ef8a..70ba487 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/ThreeStepLogAlgorithmForPathMergeVertex.java
@@ -178,7 +178,8 @@
getVertexValue().setMergeChain(getVertexId());
setVertexValue(getVertexValue());
}
- msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
+ msg.set(null, getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage());
+ //msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
setMessageType(msg.getMessage());
destVertexId.set(msg.getSourceVertexId());
sendMsg(destVertexId,msg);
@@ -240,12 +241,14 @@
*/
public void startSendMsg(){
if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
+ msg.set(null, null, (byte)0, Message.START);
+ //msg.set(getVertexId(), chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
+ msg.set(null, null, (byte)0, Message.END);
+ //msg.set(getVertexId(), chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
index 526a57c..2bec590 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/TwoStepLogAlgorithmForPathMergeVertex.java
@@ -132,6 +132,7 @@
* send start message to next node
*/
public void sendStartMsgToNextNode(){
+ msg.reset();
msg.setMessage(Message.START);
msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
@@ -141,6 +142,7 @@
* send end message to next node
*/
public void sendEndMsgToNextNode(){
+ msg.reset();
msg.setMessage(Message.END);
msg.setSourceVertexId(getVertexId());
sendMsg(destVertexId, msg);
@@ -179,9 +181,10 @@
getVertexValue().setMergeChain(getVertexId());
setVertexValue(getVertexValue());
}
- msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
- setMessageType(msg.getMessage());
destVertexId.set(msg.getSourceVertexId());
+ msg.set(null, getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage());
+ //msg.set(msg.getSourceVertexId(), getVertexValue().getMergeChain(), getVertexValue().getAdjMap(), msg.getMessage(), getVertexValue().getState());
+ setMessageType(msg.getMessage());
sendMsg(destVertexId,msg);
}
/**
@@ -235,16 +238,18 @@
}
}
/**
- * start sending message
+ * start sending message
*/
public void startSendMsg(){
if(GraphVertexOperation.isHeadVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
+ msg.set(null, null, (byte)0, Message.START);
+ //msg.set(getVertexId(), chainVertexId, (byte)0, Message.START, State.NON_VERTEX); //msg.set(null, (byte)0, chainVertexId, Message.START, State.NON_VERTEX);
sendMsgToAllNextNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
if(GraphVertexOperation.isRearVertex(getVertexValue().getAdjMap())){
- msg.set(getVertexId(), chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
+ msg.set(null, null, (byte)0, Message.END);
+ //msg.set(getVertexId(), chainVertexId, (byte)0, Message.END, State.NON_VERTEX);
sendMsgToAllPreviousNodes(getVertexId(), getVertexValue().getAdjMap());
voteToHalt();
}
@@ -273,7 +278,7 @@
*/
public void sendMsgToPathVertex(Iterator<LogAlgorithmMessageWritable> msgIterator){
if(getSuperstep() == 3){
- msg.reset();
+ //msg.reset();
sendMsgToPathVertex(getVertexId(), getVertexValue().getAdjMap());
}
else{
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
index f556a73..9e82cc9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/Message.java
@@ -2,13 +2,13 @@
public class Message {
- public static final int NON = 0;
- public static final int START = 1;
- public static final int END = 2;
+ public static final byte NON = 0;
+ public static final byte START = 1;
+ public static final byte END = 2;
public final static class MESSAGE_CONTENT{
- public static String getContentFromCode(int code){
+ public static String getContentFromCode(byte code){
String r = "";
switch(code){
case NON:
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
index 61bcb0c..4bb40c9 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/State.java
@@ -1,18 +1,19 @@
package edu.uci.ics.genomix.pregelix.type;
public class State {
- public static final int NON_VERTEX = 0;
- public static final int START_VERTEX = 1;
- public static final int END_VERTEX = 2;
- public static final int MID_VERTEX = 3;
- public static final int TODELETE = 4;
- public static final int FINAL_VERTEX = 5;
- public static final int FINAL_DELETE = 6;
- public static final int KILL_SELF = 7;
+
+ public static final byte NON_VERTEX = 0;
+ public static final byte START_VERTEX = 1;
+ public static final byte END_VERTEX = 2;
+ public static final byte MID_VERTEX = 3;
+ public static final byte TODELETE = 4;
+ public static final byte FINAL_VERTEX = 5;
+ public static final byte FINAL_DELETE = 6;
+ public static final byte KILL_SELF = 7;
public final static class STATE_CONTENT{
- public static String getContentFromCode(int code){
+ public static String getContentFromCode(byte code){
String r = "";
switch(code){
case NON_VERTEX:
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index d5b42c2..ea9da04 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -55,6 +55,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ //job.getConfiguration().setInt(NaiveAlgorithmForPathMergeVertex.ITERATIONS, 10);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -73,6 +74,7 @@
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setInt(ThreeStepLogAlgorithmForPathMergeVertex.KMER_SIZE, 5);
+ job.getConfiguration().setInt(ThreeStepLogAlgorithmForPathMergeVertex.ITERATIONS, 5);
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
@@ -105,8 +107,8 @@
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
//genLoadGraph();
- //genMergeGraph();
- genThreeStepLogAlgorithmForMergeGraph();
+ genMergeGraph();
+ //genThreeStepLogAlgorithmForMergeGraph();
//genTwoStepLogAlgorithmForMergeGraph();
}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
index 45d8185..fd0749a 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/sequencefile/TreePath";
+ private static final String DATA_PATH = "data/sequencefile/LongPath";
private static final String HDFS_PATH = "/webmap/";
private static final String HYRACKS_APP_NAME = "pregelix";