update

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3228 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 47b740b..5b75e05 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -306,8 +306,8 @@
 	 * merge two BytesWritable. Ex. merge two vertexId
 	 */
 	public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2){
-		String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*k);
-		return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), 1);
+		String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*length);
+		return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), s2.length()/2);
 	}
 	/**
 	 * update right neighber
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
index 68c201b..5a4a47c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -60,6 +60,8 @@
 	private byte[] tmpChainVertexId;
 	private byte[] mergeChainVertexId;
 	private int lengthOfMergeChainVertex;
+	private byte[] tmpMergeChainVertexId;
+	private int tmpLengthOfMergeChainVertex;
 	private byte tmpVertexValue;
 	private int tmpVertexState;
 	private int tmpMessage;
@@ -112,8 +114,8 @@
 			if(msgIterator.hasNext()){
 				tmpMsg = msgIterator.next();
 				tmpMessage = tmpMsg.getMessage();
-				tmpVertexState = getVertexValue().getState();
 				tmpVal = getVertexValue();
+				tmpVertexState = tmpVal.getState();
 				if(tmpMessage == Message.START && tmpVertexState == State.MID_VERTEX){
 					tmpVal.setState(State.START_VERTEX);
 					setVertexValue(tmpVal);
@@ -134,21 +136,24 @@
 		else if(getSuperstep()%3 == 0){
 			tmpVal = getVertexValue();
 			tmpVertexValue = tmpVal.getValue();
-			//if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
-			//		&& !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
-				if(msgIterator.hasNext())
+			tmpVertexState = tmpVal.getState();
+			tmpSourceVertextId = getVertexId().getBytes();
+			if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
+					&& !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
+				// compute the right neighber - destination vertex Id
+				if(msgIterator.hasNext()){
 					tmpMsg = msgIterator.next();
-				else
-					tmpMsg = new LogAlgorithmMessageWritable();
-				tmpVertexState = getVertexValue().getState();
-				tmpSourceVertextId = getVertexId().getBytes();
-				if(lengthOfMergeChainVertex == 0)
-					tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
-						getVertexValue().getValue());
+					tmpLengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
+					tmpMergeChainVertexId = tmpMsg.getChainVertexId();
+					byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId, 
+							tmpLengthOfMergeChainVertex);
+					tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, 
+							tmpMsg.getNeighberInfo());
+				}
 				else{
-					byte[] lastKmer = GraphVertexOperation.getLastKmer(mergeChainVertexId, 
-							lengthOfMergeChainVertex);
-					tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, getVertexValue().getValue());
+					tmpMsg = new LogAlgorithmMessageWritable();
+					tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
+							getVertexValue().getValue());
 				}
 				
 				/*if(tmpMsg.getLengthOfChain()== 0){
@@ -171,7 +176,7 @@
 					GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
 							tmpDestVertexId, tmpMsg, null);
 				}
-			//}
+			}
 		}
 		//path node sends message back to head node
 		else if(getSuperstep()%3 == 1){
@@ -181,16 +186,24 @@
 				tmpMessage = tmpMsg.getMessage();
 				tmpSourceVertextId = tmpMsg.getSourceVertexId();
 				tmpChainVertexId = tmpMsg.getChainVertexId();
-				//byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
-				//		tmpVal.getValue());
-				tmpMsg.setSourceVertexId(getVertexId().getBytes());
+				if(tmpVal.getLengthOfMergeChain() == 0){
+					tmpMsg.setSourceVertexId(getVertexId().getBytes());
+					tmpVal.setLengthOfMergeChain(GraphVertexOperation.k);
+					tmpVal.setMergeChain(getVertexId().getBytes());
+					setVertexValue(tmpVal);
+				}
+				else{
+					tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
+					tmpMsg.setSourceVertexId(tmpVal.getMergeChain());
+				}
+				
 				tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
 				tmpMsg.setSourceVertexState(tmpVal.getState());
-				/*tmpMsg.incrementLength();
-				tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(
-						tmpChainVertexId,
-						tmpMsg.getLengthOfChain()-1,
-						getVertexId().getBytes()));*/
+				if(tmpVal.getState() == State.END_VERTEX)
+					tmpMsg.setMessage(Message.END);
+				else
+					tmpMsg.setMessage(Message.NON);
+
 				sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
 				//test
 				GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
@@ -214,27 +227,47 @@
 		else if(getSuperstep()%3 == 2){
 			if(msgIterator.hasNext()){
 				tmpMsg = msgIterator.next();
-				tmpVertexState = getVertexValue().getState();
+				tmpVal = getVertexValue();
+				if(tmpMsg.getMessage() == Message.END)
+					tmpVertexState = State.END_VERTEX;
+				else
+					tmpVertexState = getVertexValue().getState();
+				tmpVal.setState(tmpVertexState);
 				tmpSourceVertextId = getVertexId().getBytes();
 				if(tmpVertexState == State.TODELETE){
-					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "already merged by head");
+					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), tmpSourceVertextId, "already merged by head");
 					deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
 				}
-				if(getSuperstep() == 5){
-					lengthOfMergeChainVertex = GraphVertexOperation.k;
-					mergeChainVertexId = getVertexId().getBytes();
+				else{
+					if(getSuperstep() == 5){
+						lengthOfMergeChainVertex = GraphVertexOperation.k;
+						mergeChainVertexId = getVertexId().getBytes();
+					}
+					else{
+						lengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
+						mergeChainVertexId = tmpMsg.getChainVertexId();
+					}
+					mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
+							tmpMsg.getSourceVertexId());
+					lengthOfMergeChainVertex = 2*lengthOfMergeChainVertex - GraphVertexOperation.k + 1;
+					tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
+					tmpVal.setMergeChain(mergeChainVertexId);
+					
+					//test
+					GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
+							mergeChainVertexId, lengthOfMergeChainVertex);
+					byte tmpByte = tmpMsg.getNeighberInfo();
+					tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
+					tmpVal.setValue(tmpVertexValue);
+					setVertexValue(tmpVal);
+					tmpMsg.setNeighberInfo(tmpVertexValue);
+					tmpMsg.setLengthOfChain(lengthOfMergeChainVertex);
+					tmpMsg.setChainVertexId(mergeChainVertexId);
+					sendMsg(getVertexId(),tmpMsg);
+					//test
+					GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+							getVertexId().getBytes(), tmpMsg, null);
 				}
-				mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
-						tmpMsg.getSourceVertexId());
-				lengthOfMergeChainVertex++;
-				//test
-				GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
-						mergeChainVertexId, lengthOfMergeChainVertex);
-				byte tmpByte = tmpMsg.getNeighberInfo();
-				tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
-				tmpVal = getVertexValue();
-				tmpVal.setValue(tmpVertexValue);
-				setVertexValue(tmpVal);
 				if(tmpVertexState == State.END_VERTEX)
 					voteToHalt();
 			}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
index b52c9a5..78d4f0c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/ValueStateWritable.java
@@ -11,14 +11,19 @@
 
 	private byte value;
 	private int state;
+	private int lengthOfMergeChain;
+	private byte[] mergeChain;
 
 	public ValueStateWritable() {
 		state = State.NON_VERTEX;
+		lengthOfMergeChain = 0;
 	}
 
-	public ValueStateWritable(byte value, int state) {
+	public ValueStateWritable(byte value, int state, int lengthOfMergeChain, byte[] mergeChain) {
 		this.value = value;
 		this.state = state;
+		this.lengthOfMergeChain = lengthOfMergeChain;
+		this.mergeChain = mergeChain;
 	}
 
 	public byte getValue() {
@@ -37,16 +42,42 @@
 		this.state = state;
 	}
 
+	public int getLengthOfMergeChain() {
+		return lengthOfMergeChain;
+	}
+
+	public void setLengthOfMergeChain(int lengthOfMergeChain) {
+		this.lengthOfMergeChain = lengthOfMergeChain;
+	}
+
+	public byte[] getMergeChain() {
+		return mergeChain;
+	}
+
+	public void setMergeChain(byte[] mergeChain) {
+		this.mergeChain = mergeChain;
+	}
+
 	@Override
 	public void readFields(DataInput in) throws IOException {
 		value = in.readByte();
 		state = in.readInt();
+		lengthOfMergeChain = in.readInt();
+		if(lengthOfMergeChain != 0){
+			mergeChain = new byte[(lengthOfMergeChain-1)/4 + 1];
+			in.readFully(mergeChain);
+		}
+		else
+			mergeChain = new byte[0];
 	}
 
 	@Override
 	public void write(DataOutput out) throws IOException {
 		out.writeByte(value);
 		out.writeInt(state);
+		out.writeInt(lengthOfMergeChain);
+		if(lengthOfMergeChain != 0)
+			out.write(mergeChain);
 	}
 
 	@Override