update

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3133 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
index 7cf8711..f203297 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/BinaryLoadGraphInputFormat.java
@@ -61,17 +61,6 @@
 	            /**
 	             * set the src vertex id
 	             */
-            	/*vertexId = getRecordReader().getCurrentKey();
-            	byte[] vertexBytes = vertexId.getBytes();
-            	int numOfByte = (2*GraphVertexOperation.k-1)/8 + 1;
-        		if(vertexBytes.length == numOfByte)
-        			vertex.setVertexId(vertexId);
-        		else{
-        			byte[] tmp = new byte[numOfByte];
-        			for(int i = 0; i < numOfByte; i++)
-        				tmp[i] = vertexBytes[i];
-        			vertex.setVertexId(new BytesWritable(tmp));
-        		}*/
 	            
         		vertexId = getRecordReader().getCurrentKey();
         		vertex.setVertexId(vertexId);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 9260a2f..47b740b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -17,7 +17,9 @@
 
 import edu.uci.ics.pregelix.SequenceFile.GenerateSequenceFile;
 import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
 import edu.uci.ics.pregelix.example.io.MessageWritable;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
 import edu.uci.ics.pregelix.hdfs.HDFSOperation;
 import edu.uci.ics.pregelix.type.KmerCountValue;
 
@@ -170,6 +172,41 @@
 		}
 	}
 	/**
+	 * check what kind of precursor node
+	 * return 0:A 1:C 2:G 3:T 4:nothing
+	 */
+	public static int findPrecursorNode(byte vertexValue){
+		String firstBit = "00010000"; //A
+		String secondBit = "00100000"; //C
+		String thirdBit = "01000000"; //G
+		String fourthBit = "10000000"; //T
+		int first = BitwiseOperation.convertBinaryStringToByte(firstBit) & 0xff;
+		int second = BitwiseOperation.convertBinaryStringToByte(secondBit) & 0xff;
+		int third = BitwiseOperation.convertBinaryStringToByte(thirdBit) & 0xff;
+		int fourth = BitwiseOperation.convertBinaryStringToByte(fourthBit) & 0xff;
+		int value = vertexValue & 0xff;
+		int tmp = value & first;
+		if(tmp != 0)
+			return 0;
+		else{
+			tmp = value & second;
+			if(tmp != 0)
+				return 1;
+			else{
+				tmp = value & third;
+				if(tmp != 0)
+					return 2;
+				else{
+					tmp = value & fourth;
+					if(tmp != 0)
+						return 3;
+					else
+						return 4;
+				}
+			}
+		}
+	}
+	/**
 	 * replace last two bits based on n
 	 * Ex. 01 10 00(nothing)	->	01 10 00(A)/01(C)/10(G)/11(T)		
 	 */
@@ -198,13 +235,47 @@
 		return BitwiseOperation.convertBinaryStringToBytes(resultString);
 	}
 	/**
-	 * find the vertexId of the destination node
+	 * replace first two bits based on n
+	 * Ex. 01 10 00(nothing)	->	00(A)/01(C)/10(G)/11(T) 10 00	
+	 */
+	public static byte[] replaceFirstTwoBits(byte[] vertexId, int n){
+		String binaryStringVertexId = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, k);
+		String resultString = "";
+		switch(n){
+		case 0:
+			resultString += "00";
+			break;
+		case 1:
+			resultString += "01";
+			break;
+		case 2:
+			resultString += "10";
+			break;
+		case 3:
+			resultString += "11";
+			break;
+		default:
+			break;
+		}
+		for(int i = 2; i < binaryStringVertexId.length(); i++)
+			resultString += binaryStringVertexId.charAt(i);
+		return BitwiseOperation.convertBinaryStringToBytes(resultString);
+	}
+	/**
+	 * find the vertexId of the destination node - left neighber
 	 */
 	public static byte[] getDestVertexId(byte[] sourceVertexId, byte vertexValue){
 		byte[] destVertexId = BitwiseOperation.shiftBitsLeft(sourceVertexId, 2);
 		return replaceLastTwoBits(destVertexId, findSucceedNode(vertexValue));
 	}
 	/**
+	 * find the vertexId of the destination node - right neighber
+	 */
+	public static byte[] getLeftDestVertexId(byte[] sourceVertexId, byte vertexValue){
+		byte[] destVertexId = BitwiseOperation.shiftBitsRight(sourceVertexId, 2);
+		return replaceFirstTwoBits(destVertexId, findPrecursorNode(vertexValue));
+	}
+	/**
 	 * update the chain vertexId
 	 */
 	public static byte[] updateChainVertexId(byte[] chainVertexId, int lengthOfChainVertex, byte[] newVertexId){
@@ -234,8 +305,9 @@
 	/**
 	 * merge two BytesWritable. Ex. merge two vertexId
 	 */
-	public static BytesWritable mergeTwoChainVertex(byte[] b1, byte[] b2, int length){
-		return new BytesWritable(BitwiseOperation.mergeTwoBytesArray(b1, length, b2, length));
+	public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2){
+		String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*k);
+		return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), 1);
 	}
 	/**
 	 * update right neighber
@@ -395,6 +467,42 @@
     	return;
 	}
 	/**
+	 *  output test for message communication
+	 */
+	public static void testMessageCommunication2(OutputStreamWriter writer, long step, byte[] tmpSourceVertextId,
+			byte[] tmpDestVertexId, LogAlgorithmMessageWritable tmpMsg, byte[] myownId){
+		//test
+    	String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(
+    			tmpSourceVertextId,GraphVertexOperation.k);
+    	try {
+    		writer.write("Step: " + step + "\r\n");
+			writer.write("Source Key: " + kmer + "\r\n");
+		
+        	writer.write("Source Code: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(kmer) + "\r\n");
+        	writer.write("Send Message to: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(
+		    				BitwiseOperation.convertBytesToBinaryStringKmer(
+		    						tmpDestVertexId,GraphVertexOperation.k)) + "\r\n");
+        	if(tmpMsg.getLengthOfChain() != 0){
+        		writer.write("Chain Message: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(
+		    						BitwiseOperation.convertBytesToBinaryString(
+		    								tmpMsg.getChainVertexId())) + "\r\n");
+        		writer.write("Chain Length: " + tmpMsg.getLengthOfChain() + "\r\n"); 
+        	}
+        	if(myownId != null)
+        		writer.write("My own Id is: " + 
+        				GraphVertexOperation.convertBinaryStringToGenecode(
+	    						BitwiseOperation.convertBytesToBinaryStringKmer(
+	    								myownId,GraphVertexOperation.k)) + "\r\n");
+        	if(tmpMsg.getMessage() != 0)
+        		writer.write("Message is: " + tmpMsg.getMessage() + "\r\n");
+        	writer.write("\r\n");
+    	} catch (IOException e) { e.printStackTrace(); }
+    	return;
+	}
+	/**
 	 *  output test for last message communication -- flush
 	 */
 	public static void testLastMessageCommunication(OutputStreamWriter writer, long step, byte[] tmpVertextId,
@@ -420,5 +528,88 @@
 			e.printStackTrace();
 		}
 	}
+	/**
+	 *  output test for log message communication
+	 */
+	public static void testLogMessageCommunication(OutputStreamWriter writer, long step, byte[] tmpSourceVertextId,
+			byte[] tmpDestVertexId, LogAlgorithmMessageWritable tmpMsg){
+		//test
+    	String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(
+    			tmpSourceVertextId,GraphVertexOperation.k);
+    	try {
+    		writer.write("Step: " + step + "\r\n");
+			writer.write("Source Key: " + kmer + "\r\n");
 		
+        	writer.write("Source Code: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(kmer) + "\r\n");
+        	writer.write("Send Message to: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(
+		    				BitwiseOperation.convertBytesToBinaryStringKmer(
+		    						tmpDestVertexId,GraphVertexOperation.k)) + "\r\n");
+        	writer.write("Message is: " +
+        			tmpMsg.getMessage() + "\r\n");
+        	writer.write("\r\n");
+    	} catch (IOException e) { e.printStackTrace(); }
+    	return;
+	}	
+	/**
+	 *  test set vertex state
+	 */
+	public static void testSetVertexState(OutputStreamWriter writer, long step,byte[] tmpSourceVertextId,
+			byte[] tmpDestVertexId, LogAlgorithmMessageWritable tmpMsg, ValueStateWritable tmpVal){
+		//test
+    	String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(
+    			tmpSourceVertextId,GraphVertexOperation.k);
+    	try {
+    		writer.write("Step: " + step + "\r\n");
+			writer.write("Source Key: " + kmer + "\r\n");
+		
+        	writer.write("Source Code: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(kmer) + "\r\n");
+        	if(tmpDestVertexId != null && tmpMsg != null){
+	        	writer.write("Send Message to: " + 
+			    		GraphVertexOperation.convertBinaryStringToGenecode(
+			    				BitwiseOperation.convertBytesToBinaryStringKmer(
+			    						tmpDestVertexId,GraphVertexOperation.k)) + "\r\n");
+	        	writer.write("Message is: " +
+	        			tmpMsg.getMessage() + "\r\n");
+        	}
+        	writer.write("Set vertex state to " +
+        			tmpVal.getState() + "\r\n");
+        	writer.write("\r\n");
+
+    	} catch (IOException e) { e.printStackTrace(); }
+    	return;
+	}
+	/**
+	 *  test delete vertex information
+	 */
+	public static void testDeleteVertexInfo(OutputStreamWriter writer, long step, byte[] vertexId, String reason){
+		try {
+    		writer.write("Step: " + step + "\r\n");
+			writer.write(reason + "\r\n");
+			writer.write("delete " + BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, GraphVertexOperation.k) 
+				 + "\t" + GraphVertexOperation.convertBinaryStringToGenecode(
+		    				BitwiseOperation.convertBytesToBinaryStringKmer(
+		    						vertexId,GraphVertexOperation.k)) + "\r\n");
+			writer.write("\r\n");
+    	} catch (IOException e) { e.printStackTrace(); }
+    	return;
+	}
+	/**
+	 *  test merge chain vertex
+	 */
+	public static void testMergeChainVertex(OutputStreamWriter writer, long step, byte[] mergeChain,
+			int lengthOfChain){
+		try {
+    		writer.write("Step: " + step + "\r\n");
+        	writer.write("Merge Chain: " + 
+		    		GraphVertexOperation.convertBinaryStringToGenecode(
+		    						BitwiseOperation.convertBytesToBinaryString(
+		    								mergeChain)) + "\r\n");
+        	writer.write("Chain Length: " + lengthOfChain + "\r\n");
+			writer.write("\r\n");
+    	} catch (IOException e) { e.printStackTrace(); }
+    	return;
+	}
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphInputFormat.java
new file mode 100644
index 0000000..a6e4a6c
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphInputFormat.java
@@ -0,0 +1,87 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexInputFormat;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
+import edu.uci.ics.pregelix.type.KmerCountValue;
+import edu.uci.ics.pregelix.type.State;
+
+public class LogAlgorithmForMergeGraphInputFormat extends
+	BinaryVertexInputFormat<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable>{
+
+	/**
+	 * Format INPUT
+	 */
+    @Override
+    public VertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new BinaryLoadGraphReader(binaryInputFormat.createRecordReader(split, context));
+    }
+    
+    @SuppressWarnings("rawtypes")
+    class BinaryLoadGraphReader extends
+            BinaryVertexReader<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> {
+        private final static String separator = " ";
+        private Vertex vertex;
+        private BytesWritable vertexId = new BytesWritable();
+        private ValueStateWritable vertexValue = new ValueStateWritable();
+
+        public BinaryLoadGraphReader(RecordReader<BytesWritable,KmerCountValue> recordReader) {
+            super(recordReader);
+        }
+
+        @Override
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public Vertex<BytesWritable, ValueStateWritable, NullWritable, LogAlgorithmMessageWritable> getCurrentVertex() throws IOException,
+                InterruptedException {
+            if (vertex == null)
+                vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+            vertex.getMsgList().clear();
+            vertex.getEdges().clear();
+            
+            
+            if(getRecordReader() != null){
+	            /**
+	             * set the src vertex id
+	             */
+	            
+        		vertexId = getRecordReader().getCurrentKey();
+        		vertex.setVertexId(vertexId);
+	            /**
+	             * set the vertex value
+	             */
+	            KmerCountValue kmerCountValue = getRecordReader().getCurrentValue();
+	            vertexValue.setValue(kmerCountValue.getAdjBitMap()); 
+	            vertexValue.setState(State.NON_VERTEX);
+	            vertex.setVertexValue(vertexValue);
+	            
+	        	String kmer = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId.getBytes(),GraphVertexOperation.k);
+			    System.out.println("key: " + kmer);
+			    System.out.println("code: " + GraphVertexOperation.convertBinaryStringToGenecode(kmer));
+			    System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(kmerCountValue.getAdjBitMap()));
+			    System.out.println();
+            }
+            
+            return vertex;
+        }
+    }
+	
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphOutputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
new file mode 100644
index 0000000..9d4672a
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphOutputFormat.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.binary.BinaryVertexOutputFormat;
+import edu.uci.ics.pregelix.example.io.ValueStateWritable;
+
+public class LogAlgorithmForMergeGraphOutputFormat extends 
+	BinaryVertexOutputFormat<BytesWritable, ValueStateWritable, NullWritable> {
+
+        @Override
+        public VertexWriter<BytesWritable, ValueStateWritable, NullWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<BytesWritable, ByteWritable> recordWriter = binaryOutputFormat.getRecordWriter(context);
+            return new BinaryLoadGraphVertexWriter(recordWriter);
+        }
+        
+        /**
+         * Simple VertexWriter that supports {@link BinaryLoadGraphVertex}
+         */
+        public static class BinaryLoadGraphVertexWriter extends
+                BinaryVertexWriter<BytesWritable, ValueStateWritable, NullWritable> {
+            public BinaryLoadGraphVertexWriter(RecordWriter<BytesWritable, ByteWritable> lineRecordWriter) {
+                super(lineRecordWriter);
+            }
+
+            @Override
+            public void writeVertex(Vertex<BytesWritable, ValueStateWritable, NullWritable, ?> vertex) throws IOException,
+                    InterruptedException {
+                getRecordWriter().write(vertex.getVertexId(),
+                		new ByteWritable(vertex.getVertexValue().getValue()));
+            }
+        }
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
index 3561550..68c201b 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -1,6 +1,9 @@
 package edu.uci.ics.pregelix;
 
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
@@ -14,6 +17,7 @@
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 import edu.uci.ics.pregelix.example.client.Client;
 import edu.uci.ics.pregelix.example.io.LogAlgorithmMessageWritable;
@@ -23,9 +27,9 @@
 
 /*
  * vertexId: BytesWritable
- * vertexValue: ByteWritable
+ * vertexValue: ValueStateWritable
  * edgeValue: NullWritable
- * message: MessageWritable
+ * message: LogAlgorithmMessageWritable
  * 
  * DNA:
  * A: 00
@@ -54,35 +58,52 @@
 	private byte[] tmpSourceVertextId;
 	private byte[] tmpDestVertexId;
 	private byte[] tmpChainVertexId;
+	private byte[] mergeChainVertexId;
+	private int lengthOfMergeChainVertex;
 	private byte tmpVertexValue;
 	private int tmpVertexState;
 	private int tmpMessage;
 	private ValueStateWritable tmpVal = new ValueStateWritable();
 	private LogAlgorithmMessageWritable tmpMsg = new LogAlgorithmMessageWritable();
+	OutputStreamWriter writer; 
 	/**
-	 * For test, in compute method, make each vertexValue shift 1 to left.
-	 * It will be modified when going forward to next step.
+	 * Log Algorithm for path merge graph
 	 */
 	@Override
 	public void compute(Iterator<LogAlgorithmMessageWritable> msgIterator) {
+		try {
+			writer = new OutputStreamWriter(new FileOutputStream("test/check",true));
+		} catch (FileNotFoundException e1) { e1.printStackTrace();}
 		if (getSuperstep() == 1) {
 			tmpVal = getVertexValue();
 			tmpVertexValue = tmpVal.getValue();
+			tmpChainVertexId = new byte[0];
+			tmpMsg.setChainVertexId(tmpChainVertexId);
 			if(GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))){
 				tmpMsg.setMessage(Message.START);
 				tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), tmpVertexValue);
 				sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+				//test
+				GraphVertexOperation.testLogMessageCommunication(writer, getSuperstep(),
+						getVertexId().getBytes(), tmpDestVertexId, tmpMsg);
 				voteToHalt();
 			}
 			else if(GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
 				tmpMsg.setMessage(Message.END);
-				tmpDestVertexId = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), tmpVertexValue);
+				tmpDestVertexId = GraphVertexOperation.getLeftDestVertexId(getVertexId().getBytes(), tmpVertexValue);
 				sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+				//test
+				GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(), 
+						tmpDestVertexId, tmpMsg, tmpVal);
 				voteToHalt();
 			}
 			else if(GraphVertexOperation.isPathVertex(new ByteWritable(tmpVertexValue))){
+				tmpVal = getVertexValue();
 				tmpVal.setState(State.MID_VERTEX);
 				setVertexValue(tmpVal);
+				//test
+				GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(), 
+						null, null, tmpVal);
 			}
 			else
 				voteToHalt();
@@ -92,32 +113,65 @@
 				tmpMsg = msgIterator.next();
 				tmpMessage = tmpMsg.getMessage();
 				tmpVertexState = getVertexValue().getState();
+				tmpVal = getVertexValue();
 				if(tmpMessage == Message.START && tmpVertexState == State.MID_VERTEX){
 					tmpVal.setState(State.START_VERTEX);
 					setVertexValue(tmpVal);
+					//test
+					GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(), 
+							null, null, tmpVal);
 				}
 				else if(tmpMessage == Message.END && tmpVertexState == State.MID_VERTEX){
 					tmpVal.setState(State.END_VERTEX);
 					setVertexValue(tmpVal);
+					//test
+					GraphVertexOperation.testSetVertexState(writer, getSuperstep(), getVertexId().getBytes(), 
+							null, null, tmpVal);
 				}
 			}
 		}
 		//head node sends message to path node
 		else if(getSuperstep()%3 == 0){
-			tmpVertexState = getVertexValue().getState();
-			tmpSourceVertextId = getVertexId().getBytes();
-			tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
-					getVertexValue().getValue());
-			if(tmpVertexState == State.START_VERTEX){
-				tmpMsg.setMessage(Message.START);
-				tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
-				sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
-			}
-			else if(tmpVertexState == State.END_VERTEX){
-				tmpMsg.setMessage(Message.NON);
-				tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
-				sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
-			}
+			tmpVal = getVertexValue();
+			tmpVertexValue = tmpVal.getValue();
+			//if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
+			//		&& !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
+				if(msgIterator.hasNext())
+					tmpMsg = msgIterator.next();
+				else
+					tmpMsg = new LogAlgorithmMessageWritable();
+				tmpVertexState = getVertexValue().getState();
+				tmpSourceVertextId = getVertexId().getBytes();
+				if(lengthOfMergeChainVertex == 0)
+					tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
+						getVertexValue().getValue());
+				else{
+					byte[] lastKmer = GraphVertexOperation.getLastKmer(mergeChainVertexId, 
+							lengthOfMergeChainVertex);
+					tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, getVertexValue().getValue());
+				}
+				
+				/*if(tmpMsg.getLengthOfChain()== 0){
+					tmpMsg.setLengthOfChain(GraphVertexOperation.k);
+					tmpMsg.setChainVertexId(getVertexId().getBytes());
+				}*/
+				if(tmpVertexState == State.START_VERTEX){
+					tmpMsg.setMessage(Message.START);
+					tmpMsg.setSourceVertexId(tmpSourceVertextId);
+					sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+					//test
+					GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+							tmpDestVertexId, tmpMsg, null);
+				}
+				else if(tmpVertexState != State.END_VERTEX){
+					tmpMsg.setMessage(Message.NON);
+					tmpMsg.setSourceVertexId(tmpSourceVertextId);
+					sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+					//test
+					GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+							tmpDestVertexId, tmpMsg, null);
+				}
+			//}
 		}
 		//path node sends message back to head node
 		else if(getSuperstep()%3 == 1){
@@ -125,32 +179,59 @@
 				tmpVal = getVertexValue();
 				tmpMsg = msgIterator.next();
 				tmpMessage = tmpMsg.getMessage();
-				tmpSourceVertextId = getVertexId().getBytes();
-				tmpMsg.setChainVertexId(tmpSourceVertextId);
-				byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
-						tmpVal.getValue());
-				tmpMsg.setSourceVertexIdOrNeighberInfo(tmpBytes); //set neighber
+				tmpSourceVertextId = tmpMsg.getSourceVertexId();
+				tmpChainVertexId = tmpMsg.getChainVertexId();
+				//byte[] tmpBytes = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
+				//		tmpVal.getValue());
+				tmpMsg.setSourceVertexId(getVertexId().getBytes());
+				tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
 				tmpMsg.setSourceVertexState(tmpVal.getState());
+				/*tmpMsg.incrementLength();
+				tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(
+						tmpChainVertexId,
+						tmpMsg.getLengthOfChain()-1,
+						getVertexId().getBytes()));*/
 				sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+				//test
+				GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+						tmpSourceVertextId, tmpMsg, tmpSourceVertextId);
 				//kill Message because it has been merged by the head
 				if(tmpMessage == Message.START){
 					tmpVal.setState(State.TODELETE);
 					setVertexValue(tmpVal);
 				}
 			}
-			else
-				deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf because it doesn't receive any message
+			else{
+				if(!GraphVertexOperation.isHead(new ByteWritable(getVertexValue().getValue()))
+						&& !GraphVertexOperation.isRear(new ByteWritable(getVertexValue().getValue()))
+						&& getVertexValue().getState() != State.START_VERTEX){
+
+					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "not receive any message");
+					deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
+				}
+			}
 		}
 		else if(getSuperstep()%3 == 2){
 			if(msgIterator.hasNext()){
 				tmpMsg = msgIterator.next();
 				tmpVertexState = getVertexValue().getState();
-				if(tmpVertexState == State.TODELETE)
+				tmpSourceVertextId = getVertexId().getBytes();
+				if(tmpVertexState == State.TODELETE){
+					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "already merged by head");
 					deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
-				setVertexId(GraphVertexOperation.mergeTwoChainVertex(getVertexId().getBytes(), 
-						tmpMsg.getChainVertexId(), tmpMsg.getLengthOfChain()));
-				byte[] tmpByte = tmpMsg.getSourceVertexIdOrNeighberInfo();
-				tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte[0]);
+				}
+				if(getSuperstep() == 5){
+					lengthOfMergeChainVertex = GraphVertexOperation.k;
+					mergeChainVertexId = getVertexId().getBytes();
+				}
+				mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
+						tmpMsg.getSourceVertexId());
+				lengthOfMergeChainVertex++;
+				//test
+				GraphVertexOperation.testMergeChainVertex(writer, getSuperstep(),
+						mergeChainVertexId, lengthOfMergeChainVertex);
+				byte tmpByte = tmpMsg.getNeighberInfo();
+				tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
 				tmpVal = getVertexValue();
 				tmpVal.setValue(tmpVertexValue);
 				setVertexValue(tmpVal);
@@ -158,6 +239,9 @@
 					voteToHalt();
 			}
 		}
+		try {
+			writer.close();
+		} catch (IOException e) { e.printStackTrace(); }
 	}
 	
    private void signalTerminate() {
@@ -195,8 +279,8 @@
         /**
          * BinaryInput and BinaryOutput~/
          */
-        job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class); 
-        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class); 
+        job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class); 
+        job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class); 
         job.setOutputKeyClass(BytesWritable.class);
         job.setOutputValueClass(ByteWritable.class);
         Client.run(args, job);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
index 558b4fb..3e02d2f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
@@ -62,7 +62,7 @@
 	private MessageWritable tmpMsg = new MessageWritable();
 	OutputStreamWriter writer; 
 	/**
-	 * Naive Algorithm for merge graph
+	 * Naive Algorithm for path merge graph
 	 */
 	@Override
 	public void compute(Iterator<MessageWritable> msgIterator) {
@@ -97,7 +97,6 @@
 						if(tmpChainVertexId.length == 0){
 							tmpMsg.setLengthOfChain(GraphVertexOperation.k);
 							tmpMsg.setChainVertexId(tmpVertextId);
-							sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
 						}
 						else{
 							tmpMsg.incrementLength();
@@ -105,9 +104,9 @@
 									tmpChainVertexId,
 									tmpMsg.getLengthOfChain()-1,
 									tmpVertextId));
-							sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
 							deleteVertex(getVertexId());
 						}
+						sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
 						//test
 						GraphVertexOperation.testMessageCommunication(writer,getSuperstep(),tmpVertextId,
 								tmpSourceVertextId,tmpMsg);
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
index e2d33c1..262ce7f 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/LogAlgorithmMessageWritable.java
@@ -16,30 +16,47 @@
 	 * chainVertexId stores the chains of connected DNA
 	 * file stores the point to the file that stores the chains of connected DNA
 	 */
-	private byte[] sourceVertexIdOrNeighberInfo;
+	private byte[] sourceVertexId;
+	private byte neighberInfo;
 	private int lengthOfChain;
 	private byte[] chainVertexId;
 	private File file;
 	private int message;
 	private int sourceVertexState;
 	
-	public LogAlgorithmMessageWritable(){		
+	public LogAlgorithmMessageWritable(){
+		sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
 	}
 	
-	public void set(byte[] sourceVertexIdOrNeighberInfo, byte[] chainVertexId, File file){
-		this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+	public void set(byte[] sourceVertexId,byte neighberInfo, byte[] chainVertexId, File file){
+		this.sourceVertexId = sourceVertexId;
 		this.chainVertexId = chainVertexId;
 		this.file = file;
 		this.message = 0;
 		this.lengthOfChain = 0;
 	}
-
-	public byte[] getSourceVertexIdOrNeighberInfo() {
-		return sourceVertexIdOrNeighberInfo;
+	
+	public void reset(){
+		sourceVertexId = null;
+		neighberInfo = (Byte) null;
+		message = 0;
+		sourceVertexState = 0;
 	}
 
-	public void setSourceVertexIdOrNeighberInfo(byte[] sourceVertexIdOrNeighberInfo) {
-		this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+	public byte[] getSourceVertexId() {
+		return sourceVertexId;
+	}
+
+	public void setSourceVertexId(byte[] sourceVertexId) {
+		this.sourceVertexId = sourceVertexId;
+	}
+
+	public byte getNeighberInfo() {
+		return neighberInfo;
+	}
+
+	public void setNeighberInfo(byte neighberInfo) {
+		this.neighberInfo = neighberInfo;
 	}
 
 	public byte[] getChainVertexId() {
@@ -92,7 +109,8 @@
 		out.writeInt(lengthOfChain);
 		if(lengthOfChain != 0)
 			out.write(chainVertexId);
-		out.write(sourceVertexIdOrNeighberInfo);
+		out.write(sourceVertexId);
+		out.write(neighberInfo);
 		out.writeInt(message);
 		out.writeInt(sourceVertexState);
 	}
@@ -107,11 +125,9 @@
 		}
 		else
 			chainVertexId = new byte[0];
-		if(lengthOfChain % 2 == 0)
-			sourceVertexIdOrNeighberInfo = new byte[(GraphVertexOperation.k-1)/4 + 1];
-		else
-			sourceVertexIdOrNeighberInfo = new byte[1];
-		in.readFully(sourceVertexIdOrNeighberInfo);
+		sourceVertexId = new byte[(GraphVertexOperation.k-1)/4 + 1];
+		in.readFully(sourceVertexId);
+		neighberInfo = in.readByte();
 		message = in.readInt();
 		sourceVertexState = in.readInt();
 	}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
index 7861247..151caae 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
@@ -57,20 +57,6 @@
 	}
 
 	public void setChainVertexId(byte[] chainVertexId) {
-		/*if(lengthOfChain == 0){
-			this.chainVertexId = chainVertexId;
-			return;
-		}
-		int numOfByte = (2*lengthOfChain-1)/8 + 1;
-		if(chainVertexId.length == numOfByte)
-			this.chainVertexId = chainVertexId;
-		else{
-			byte[] tmp = new byte[numOfByte];
-			for(int i = 0; i < numOfByte; i++)
-				tmp[i] = chainVertexId[i];
-			this.chainVertexId = tmp;
-		}*/
-		
 		this.chainVertexId = chainVertexId;
 	}
 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
index 85f99c2..5280fef 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/Message.java
@@ -3,7 +3,7 @@
 public class Message {
 	
 	public static final int NON = 0;
-	public static final int START = 0;
-	public static final int END = 0;
+	public static final int START = 1;
+	public static final int END = 2;
 	
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 52e9f38..c0531ff 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -13,9 +13,14 @@
 import edu.uci.ics.pregelix.BinaryLoadGraphInputFormat;
 import edu.uci.ics.pregelix.BinaryLoadGraphOutputFormat;
 import edu.uci.ics.pregelix.LoadGraphVertex;
+import edu.uci.ics.pregelix.LogAlgorithmForMergeGraphInputFormat;
+import edu.uci.ics.pregelix.LogAlgorithmForMergeGraphOutputFormat;
+import edu.uci.ics.pregelix.LogAlgorithmForMergeGraphVertex;
 import edu.uci.ics.pregelix.MergeGraphVertex;
 import edu.uci.ics.pregelix.LoadGraphVertex.SimpleLoadGraphVertexOutputFormat;
+import edu.uci.ics.pregelix.TestLoadGraphVertex;
 import edu.uci.ics.pregelix.TextLoadGraphInputFormat;
+import edu.uci.ics.pregelix.testDeleteVertexId;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 
@@ -56,13 +61,30 @@
     	generateBinaryLoadGraphJob("BinaryLoadGraph", outputBase + "BinaryLoadGraph.xml");
     }
     
+    private static void generateLogAlgorithmForMergeGraphJob(String jobName, String outputPath) throws IOException {
+    	PregelixJob job = new PregelixJob(jobName);
+    	job.setVertexClass(LogAlgorithmForMergeGraphVertex.class);
+        job.setVertexInputFormatClass(LogAlgorithmForMergeGraphInputFormat.class); 
+        job.setVertexOutputFormatClass(LogAlgorithmForMergeGraphOutputFormat.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(ByteWritable.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+    
+    private static void genLogAlgorithmForMergeGraph() throws IOException {
+    	generateLogAlgorithmForMergeGraphJob("LogAlgorithmForMergeGraph", outputBase + "LogAlgorithmForMergeGraph.xml");
+    }
+    
 	/**
 	 * @param args
 	 * @throws IOException 
 	 */
 	public static void main(String[] args) throws IOException {
 		// TODO Auto-generated method stub
-		genBinaryLoadGraph();
+		genLogAlgorithmForMergeGraph();
+		//genBinaryLoadGraph();
 		//genSequenceLoadGraph();
 		//genBasicBinaryLoadGraph();
 	}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
index 1143c95..e3fa41c 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestCase.java
@@ -50,7 +50,7 @@
         job.setJobName(jobName);
         this.resultFileName = resultFile;
         this.expectedFileName = expectedFile;
-        giraphJobGens = new JobGen[4];
+        giraphJobGens = new JobGen[1];
         giraphJobGens[0] = new JobGenOuterJoin(job);
         /*waitawhile();
         giraphJobGens[1] = new JobGenInnerJoin(job);