update
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3228 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 47b740b..5b75e05 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -306,8 +306,8 @@
* merge two BytesWritable. Ex. merge two vertexId
*/
public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2){
- String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*k);
- return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), 1);
+ String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*length);
+ return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), s2.length()/2);
}
/**
* update right neighber
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
index 68c201b..5a4a47c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -60,6 +60,8 @@
private byte[] tmpChainVertexId;
private byte[] mergeChainVertexId;
private int lengthOfMergeChainVertex;
+ private byte[] tmpMergeChainVertexId;
+ private int tmpLengthOfMergeChainVertex;
private byte tmpVertexValue;
private int tmpVertexState;
private int tmpMessage;
@@ -112,8 +114,8 @@
if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
tmpMessage = tmpMsg.getMessage();
- tmpVertexState = getVertexValue().getState();
tmpVal = getVertexValue();
+ tmpVertexState = tmpVal.getState();
if(tmpMessage == Message.START && tmpVertexState == State.MID_VERTEX){
tmpVal.setState(State.START_VERTEX);
setVertexValue(tmpVal);
@@ -134,21 +136,24 @@
else if(getSuperstep()%3 == 0){
tmpVal = getVertexValue();
tmpVertexValue = tmpVal.getValue();
- //if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
- // && !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
- if(msgIterator.hasNext())
+ tmpVertexState = tmpVal.getState();
+ tmpSourceVertextId = getVertexId().getBytes();
+ if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
+ && !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
+ // compute the right neighber - destination vertex Id
+ if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
- else
- tmpMsg = new LogAlgorithmMessageWritable();
- tmpVertexState = getVertexValue().getState();
- tmpSourceVertextId = getVertexId().getBytes();
- if(lengthOfMergeChainVertex == 0)
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
- getVertexValue().getValue());
+ tmpLengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
+ tmpMergeChainVertexId = tmpMsg.getChainVertexId();
+ byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId,
+ tmpLengthOfMergeChainVertex);
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer,
+ tmpMsg.getNeighberInfo());
+ }
else{
- byte[] lastKmer = GraphVertexOperation.getLastKmer(mergeChainVertexId,
- lengthOfMergeChainVertex);
- tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, getVertexValue().getValue());
+ tmpMsg = new LogAlgorithmMessageWritable();
+ tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
+ getVertexValue().getValue());
}
/*if(tmpMsg.getLengthOfChain()== 0){
@@ -171,7 +176,7 @@
GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
tmpDestVertexId, tmpMsg, null);
}
- //}
+ }
}
//path node sends message back to head node
else if(getSuperstep()%3 == 1){
@@ -181,16 +186,24 @@
tmpMessage = tmpMsg.getMessage();
tmpSourceVertextId = tmpMsg.getSourceVertexId();
tmpChainVertexId = tmpMsg.getChainVertexId();
- //byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId,
- // tmpVal.getValue());
- tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ if(tmpVal.getLengthOfMergeChain() == 0){
+ tmpMsg.setSourceVertexId(getVertexId().getBytes());
+ tmpVal.setLengthOfMergeChain(GraphVertexOperation.k);
+ tmpVal.setMergeChain(getVertexId().getBytes());
+ setVertexValue(tmpVal);
+ }
+ else{
+ tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
+ tmpMsg.setSourceVertexId(tmpVal.getMergeChain());
+ }
+
tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
tmpMsg.setSourceVertexState(tmpVal.getState());
- /*tmpMsg.incrementLength();
- tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(
- tmpChainVertexId,
- tmpMsg.getLengthOfChain()-1,
- getVertexId().getBytes()));*/
+ if(tmpVal.getState() == State.END_VERTEX)
+ tmpMsg.setMessage(Message.END);
+ else
+ tmpMsg.setMessage(Message.NON);
+
sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
//test
GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
@@ -214,27 +227,47 @@
else if(getSuperstep()%3 == 2){
if(msgIterator.hasNext()){
tmpMsg = msgIterator.next();
- tmpVertexState = getVertexValue().getState();
+ tmpVal = getVertexValue();
+ if(tmpMsg.getMessage() == Message.END)
+ tmpVertexState = State.END_VERTEX;
+ else
+ tmpVertexState = getVertexValue().getState();
+ tmpVal.setState(tmpVertexState);
tmpSourceVertextId = getVertexId().getBytes();
if(tmpVertexState == State.TODELETE){
- GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "already merged by head");
+ GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), tmpSourceVertextId, "already merged by head");
deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
}
- if(getSuperstep() == 5){
- lengthOfMergeChainVertex = GraphVertexOperation.k;
- mergeChainVertexId = getVertexId().getBytes();
+ else{
+ if(getSuperstep() == 5){
+ lengthOfMergeChainVertex = GraphVertexOperation.k;
+ mergeChainVertexId = getVertexId().getBytes();
+ }
+ else{
+ lengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
+ mergeChainVertexId = tmpMsg.getChainVertexId();
+ }
+ mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
+ tmpMsg.getSourceVertexId());
+ lengthOfMergeChainVertex = 2*lengthOfMergeChainVertex - GraphVertexOperation.k + 1;
+ tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
+ tmpVal.setMergeChain(mergeChainVertexId);
+
+ //test
+ GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
+ mergeChainVertexId, lengthOfMergeChainVertex);
+ byte tmpByte = tmpMsg.getNeighberInfo();
+ tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
+ tmpVal.setValue(tmpVertexValue);
+ setVertexValue(tmpVal);
+ tmpMsg.setNeighberInfo(tmpVertexValue);
+ tmpMsg.setLengthOfChain(lengthOfMergeChainVertex);
+ tmpMsg.setChainVertexId(mergeChainVertexId);
+ sendMsg(getVertexId(),tmpMsg);
+ //test
+ GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+ getVertexId().getBytes(), tmpMsg, null);
}
- mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
- tmpMsg.getSourceVertexId());
- lengthOfMergeChainVertex++;
- //test
- GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
- mergeChainVertexId, lengthOfMergeChainVertex);
- byte tmpByte = tmpMsg.getNeighberInfo();
- tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
- tmpVal = getVertexValue();
- tmpVal.setValue(tmpVertexValue);
- setVertexValue(tmpVal);
if(tmpVertexState == State.END_VERTEX)
voteToHalt();
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
index b52c9a5..78d4f0c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
@@ -11,14 +11,19 @@
private byte value;
private int state;
+ private int lengthOfMergeChain;
+ private byte[] mergeChain;
public ValueStateWritable() {
state = State.NON_VERTEX;
+ lengthOfMergeChain = 0;
}
- public ValueStateWritable(byte value, int state) {
+ public ValueStateWritable(byte value, int state, int lengthOfMergeChain, byte[] mergeChain) {
this.value = value;
this.state = state;
+ this.lengthOfMergeChain = lengthOfMergeChain;
+ this.mergeChain = mergeChain;
}
public byte getValue() {
@@ -37,16 +42,42 @@
this.state = state;
}
+ public int getLengthOfMergeChain() {
+ return lengthOfMergeChain;
+ }
+
+ public void setLengthOfMergeChain(int lengthOfMergeChain) {
+ this.lengthOfMergeChain = lengthOfMergeChain;
+ }
+
+ public byte[] getMergeChain() {
+ return mergeChain;
+ }
+
+ public void setMergeChain(byte[] mergeChain) {
+ this.mergeChain = mergeChain;
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
value = in.readByte();
state = in.readInt();
+ lengthOfMergeChain = in.readInt();
+ if(lengthOfMergeChain != 0){
+ mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
+ in.readFully(mergeChain);
+ }
+ else
+ mergeChain = new byte[0];
}
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(value);
out.writeInt(state);
+ out.writeInt(lengthOfMergeChain);
+ if(lengthOfMergeChain != 0)
+ out.write(mergeChain);
}
@Override