Update MergeGraph

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3002 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
new file mode 100644
index 0000000..6e832a6
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -0,0 +1,227 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import edu.uci.ics.pregelix.SequenceFile.GenerateSequenceFile;
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
+
+public class GraphVertexOperation {
+	public static final int k = 3; //kmer, k: the length of kmer
+	static private final Path TMP_DIR = new Path(
+			GenerateSequenceFile.class.getSimpleName() + "_INTERIM");
+	/**
+	 * Single Vertex: in-degree = out-degree = 1
+	 * @param vertexValue 
+	 */
+	public static boolean isPathVertex(ByteWritable vertexValue){
+		byte value = vertexValue.get();
+		byte[] bit = new byte[8];
+		for(int i = 0; i < 8; i++)
+			bit[i] = (byte) ((value >> i) & 0x01);
+		
+		//check out-degree
+		if(((bit[0]==1)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==0))
+				|| ((bit[0]==0)&&(bit[1]==1)&&(bit[2]==0)&&(bit[3]==0))
+				|| ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==1)&&(bit[3]==0))
+				|| ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==1))
+				){
+			//check in-degree
+			if(((bit[4]==1)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==0))
+					|| ((bit[4]==0)&&(bit[5]==1)&&(bit[6]==0)&&(bit[7]==0))
+					|| ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==1)&&(bit[7]==0))
+					|| ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==1))
+					)
+				return true;
+			else
+				return false;
+		}
+		else
+			return false;
+	}
+	/**
+	 * Head Vertex:  out-degree = 1, in-degree != 1
+	 * @param vertexValue 
+	 */
+	public static boolean isHead(ByteWritable vertexValue){
+		byte value = vertexValue.get();
+		byte[] bit = new byte[8];
+		for(int i = 0; i < 8; i++)
+			bit[i] = (byte) ((value >> i) & 0x01);
+		
+		//check out-degree
+		if(((bit[0]==1)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==0))
+				|| ((bit[0]==0)&&(bit[1]==1)&&(bit[2]==0)&&(bit[3]==0))
+				|| ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==1)&&(bit[3]==0))
+				|| ((bit[0]==0)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==1))
+				){
+			//check in-degree
+			if(!((bit[4]==1)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==0))
+					&& !((bit[4]==0)&&(bit[5]==1)&&(bit[6]==0)&&(bit[7]==0))
+					&& !((bit[4]==0)&&(bit[5]==0)&&(bit[6]==1)&&(bit[7]==0))
+					&& !((bit[4]==0)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==1))
+					)
+				return true;
+			else
+				return false;
+		}
+		else
+			return false;
+	}
+	/**
+	 * Rear Vertex:  out-degree != 1, in-degree = 1
+	 * @param vertexValue 
+	 */
+	public static boolean isRear(ByteWritable vertexValue){
+		byte value = vertexValue.get();
+		byte[] bit = new byte[8];
+		for(int i = 0; i < 8; i++)
+			bit[i] = (byte) ((value >> i) & 0x01);
+		
+		//check out-degree
+		if(!((bit[0]==1)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==0))
+				&& !((bit[0]==0)&&(bit[1]==1)&&(bit[2]==0)&&(bit[3]==0))
+				&& !((bit[0]==0)&&(bit[1]==0)&&(bit[2]==1)&&(bit[3]==0))
+				&& !((bit[0]==0)&&(bit[1]==0)&&(bit[2]==0)&&(bit[3]==1))
+				){
+			//check in-degree
+			if(((bit[4]==1)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==0))
+					|| ((bit[4]==0)&&(bit[5]==1)&&(bit[6]==0)&&(bit[7]==0))
+					|| ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==1)&&(bit[7]==0))
+					|| ((bit[4]==0)&&(bit[5]==0)&&(bit[6]==0)&&(bit[7]==1))
+					)
+				return true;
+			else
+				return false;
+		}
+		else
+			return false;
+	}
+	/**
+	 * write Kmer to Sequence File for test
+	 * @param arrayOfKeys
+	 * @param arrayOfValues
+	 * @param step
+	 * @throws IOException
+	 */
+	public void writeKmerToSequenceFile(ArrayList<BytesWritable> arrayOfKeys, ArrayList<ByteWritable> arrayOfValues, long step) throws IOException{
+		
+		Configuration conf = new Configuration();
+	    Path outDir = new Path(TMP_DIR, "out");
+	    Path outFile = new Path(outDir, "B" + Long.toString(step));
+	    FileSystem fileSys = FileSystem.get(conf);
+	    SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+	        outFile, BytesWritable.class, ByteWritable.class, 
+	        CompressionType.NONE);
+	    
+	     //wirte to sequence file
+	     for(int i = 0; i < arrayOfKeys.size(); i++)
+	    	 writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+	     writer.close();
+	}
+	/**
+	 * check what kind of succeed node
+	 * return 0:A 1:C 2:G 3:T 4:nothing
+	 */
+	public static int findSucceedNode(byte vertexValue){
+		String firstBit = "00000001"; //A
+		String secondBit = "00000010"; //C
+		String thirdBit = "00000100"; //G
+		String fourthBit = "00001000"; //T
+		int first = BitwiseOperation.convertBinaryStringToByte(firstBit) & 0xff;
+		int second = BitwiseOperation.convertBinaryStringToByte(secondBit) & 0xff;
+		int third = BitwiseOperation.convertBinaryStringToByte(thirdBit) & 0xff;
+		int fourth = BitwiseOperation.convertBinaryStringToByte(fourthBit) & 0xff;
+		int value = vertexValue & 0xff;
+		int tmp = value & first;
+		if(tmp != 0)
+			return 0;
+		else{
+			tmp = value & second;
+			if(tmp != 0)
+				return 1;
+			else{
+				tmp = value & third;
+				if(tmp != 0)
+					return 2;
+				else{
+					tmp = value & fourth;
+					if(tmp != 0)
+						return 3;
+					else
+						return 4;
+				}
+			}
+		}
+	}
+	/**
+	 * replace last two bits based on n
+	 * Ex. 01 10 00(nothing)	->	01 10 00(A)/01(C)/10(G)/11(T)		
+	 */
+	public static byte[] replaceLastTwoBits(byte[] vertexId, int n){
+		String binaryStringVertexId = BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, 3);
+		String resultString = "";
+		for(int i = 0; i < binaryStringVertexId.length()-2; i++)
+			resultString += binaryStringVertexId.charAt(i);
+		switch(n){
+		case 0:
+			resultString += "00";
+			break;
+		case 1:
+			resultString += "01";
+			break;
+		case 2:
+			resultString += "10";
+			break;
+		case 3:
+			resultString += "11";
+			break;
+		default:
+			break;
+		}
+	
+		return BitwiseOperation.convertBinaryStringToBytes(resultString);
+	}
+	/**
+	 * find the vertexId of the destination node
+	 */
+	public static byte[] getDestVertexId(byte[] sourceVertexId, byte vertexValue){
+		byte[] destVertexId = BitwiseOperation.shiftBitsLeft(sourceVertexId, 2);
+		return replaceLastTwoBits(destVertexId, findSucceedNode(vertexValue));
+	}
+	/**
+	 * update the chain vertexId
+	 */
+	public static byte[] updateChainVertexId(byte[] chainVertexId, int lengthOfChainVertex, byte[] newVertexId){
+		return BitwiseOperation.addLastTwoBits(chainVertexId,lengthOfChainVertex,BitwiseOperation.getLastTwoBits(newVertexId,k));
+	}
+	/**
+	 * get the first kmer from chainVertexId
+	 */
+	public static byte[] getFirstKmer(byte[] chainVertexId){
+		String originalVertexId = BitwiseOperation.convertBytesToBinaryString(chainVertexId);
+		return BitwiseOperation.convertBinaryStringToBytes(originalVertexId.substring(0,k-1));
+	}
+	/**
+	 * get the last kmer from chainVertexId
+	 */
+	public static byte[] getLastKmer(byte[] chainVertexId, int lengthOfChainVertex){
+		String originalVertexId = BitwiseOperation.convertBytesToBinaryString(chainVertexId);
+		return BitwiseOperation.convertBinaryStringToBytes(originalVertexId.substring(lengthOfChainVertex-1-k+1,lengthOfChainVertex-1));
+	}
+	/**
+	 * read vertexId from RecordReader
+	 */
+	public static BytesWritable readVertexIdFromRecordReader(BytesWritable currentKey){
+		String finalBinaryString = BitwiseOperation.convertBytesToBinaryStringKmer(currentKey.getBytes(),k);
+		return new BytesWritable(BitwiseOperation.convertBinaryStringToBytes(finalBinaryString));
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
new file mode 100644
index 0000000..2325ac3
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/MergeGraphVertex.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.pregelix;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.io.MessageWritable;
+import edu.uci.ics.pregelix.hdfs.HDFSOperation;
+
+/*
+ * vertexId: BytesWritable
+ * vertexValue: ByteWritable
+ * edgeValue: NullWritable
+ * message: MessageWritable
+ * 
+ * DNA:
+ * A: 00
+ * C: 01
+ * G: 10
+ * T: 11
+ * 
+ * succeed node
+ *  A 00000001 1
+ *  G 00000010 2
+ *  C 00000100 4
+ *  T 00001000 8
+ * precursor node
+ *  A 00010000 16
+ *  G 00100000 32
+ *  C 01000000 64
+ *  T 10000000 128
+ *  
+ * For example, ONE LINE in input file: 00,01,10	0001,0010,
+ * That means that vertexId is ACG, its succeed node is A and its precursor node is C.
+ * The succeed node and precursor node will be stored in vertexValue and we don't use edgeValue.
+ * The details about message are in edu.uci.ics.pregelix.example.io.MessageWritable. 
+ */
+public class MergeGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
+	
+	private byte[] tmpSourceVertextId;
+	private byte[] tmpDestVertexId;
+	private byte[] tmpChainVertexId;
+	private MessageWritable tmpMsg = new MessageWritable();
+	public static final int k = 3; //kmer, k = 3
+	/**
+	 * For test, in compute method, make each vertexValue shift 1 to left.
+	 * It will be modified when going forward to next step.
+	 */
+	@Override
+	public void compute(Iterator<MessageWritable> msgIterator) {
+		if (getSuperstep() == 1) {
+			if(GraphVertexOperation.isHead(getVertexValue())){
+				tmpSourceVertextId = getVertexId().getBytes(); 
+				tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, getVertexValue().get());
+				tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+				tmpChainVertexId = new byte[0];
+				tmpMsg.setChainVertexId(tmpChainVertexId);
+				sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+				}
+		}
+		//path node sends message back to head node
+		else if(getSuperstep()%2 == 0){
+			 if(msgIterator.hasNext()){
+				if(GraphVertexOperation.isPathVertex(getVertexValue())){
+					tmpMsg = msgIterator.next();
+					tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+					byte[] tmpBytes = GraphVertexOperation.getDestVertexId(getVertexId().getBytes(), getVertexValue().get());
+					tmpMsg.setSourceVertexIdOrNeighberInfo(tmpBytes); //set neighber
+					tmpChainVertexId = tmpMsg.getChainVertexId();
+					if(tmpChainVertexId.length == 0){
+						tmpMsg.setChainVertexId(getVertexId().getBytes());
+						tmpMsg.setLengthOfChain(k);
+					}
+					else{
+						tmpMsg.setChainVertexId(GraphVertexOperation.updateChainVertexId(tmpChainVertexId,tmpMsg.getLengthOfChain(),getVertexId().getBytes()));
+						tmpMsg.incrementLength();
+					}
+					sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+				}
+				else if(GraphVertexOperation.isRear(getVertexValue())){
+					tmpMsg = msgIterator.next();
+					tmpSourceVertextId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+					tmpMsg.setSourceVertexIdOrNeighberInfo(getVertexId().getBytes());
+					tmpMsg.setRear(true);
+					sendMsg(new BytesWritable(tmpSourceVertextId),tmpMsg);
+				}
+				else voteToHalt();
+			}
+		}
+		//head node sends message to path node
+		else if(getSuperstep()%2 == 1){
+			while (msgIterator.hasNext()){
+				tmpMsg = msgIterator.next();
+				if(!tmpMsg.isRear()){
+					tmpSourceVertextId = getVertexId().getBytes();
+					tmpDestVertexId = tmpMsg.getSourceVertexIdOrNeighberInfo();
+					tmpMsg.setSourceVertexIdOrNeighberInfo(tmpSourceVertextId);
+					sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+				}
+				else{
+					
+					try {
+						HDFSOperation hdfsOperation = new HDFSOperation();
+						HDFSOperation.insertHDFSFile("testHDFS/chainVertex", tmpMsg.getLengthOfChain(), tmpMsg.getChainVertexId());
+					} catch (IOException e) { e.printStackTrace(); }
+					signalTerminate();
+				}
+			}
+		}
+	}
+	
+   private void signalTerminate() {
+        Configuration conf = getContext().getConfiguration();
+        try {
+            IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+            writeMergeGraphResult(conf, true);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+   
+   private void writeMergeGraphResult(Configuration conf, boolean terminate) {
+       try {
+           FileSystem dfs = FileSystem.get(conf);
+           String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "MergeGraph";
+           Path path = new Path(pathStr);
+           if (!dfs.exists(path)) {
+               FSDataOutputStream output = dfs.create(path, true);
+               output.writeBoolean(terminate);
+               output.flush();
+               output.close();
+           }
+       } catch (IOException e) {
+           throw new IllegalStateException(e);
+       }
+   }
+
+	/**
+	 * @param args
+	 */
+	public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(MergeGraphVertex.class.getSimpleName());
+        job.setVertexClass(MergeGraphVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class); 
+        job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class); 
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(ByteWritable.class);
+        Client.run(args, job);
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
index e7c022d..304d764 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/ConvertToSequenceFile.java
@@ -33,7 +33,7 @@
 		job.setOutputFormatClass(SequenceFileOutputFormat.class);
 		job.setInputFormatClass(TextInputFormat.class);
 		
-		TextInputFormat.addInputPath(job, new Path("folder/test.dat"));
+		TextInputFormat.addInputPath(job, new Path("data/webmap/part-00000"));
 		SequenceFileOutputFormat.setOutputPath(job, new Path("folder_seq"));
 		
 		// submit and wait for completion
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
index 66676ba..e52f979 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -1,6 +1,6 @@
 package edu.uci.ics.pregelix.SequenceFile;
 
-import java.math.BigInteger;
+import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
@@ -8,29 +8,35 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.pregelix.bitwise.BitwiseOperation;
 
 public class GenerateSequenceFile {
 	
 	static private final Path TMP_DIR = new Path(
 			GenerateSequenceFile.class.getSimpleName() + "_TMP");
+	private static Path outDir = new Path(TMP_DIR, "out");
+	private final static int k = 3;
 	
-	
-	
-	 public static void main(String[] argv) throws Exception {
-		 
-	     //write output to a file
+	/**
+	 * create test.dat
+	 * A - ACG - A		000110	00010001	06	11
+	 * C - ACT - C		000111	00100010	07	22
+	 * G - CGT - G		011011	01000100	1B	44
+	 * T - GTC - T		101101	10001000	2D	88
+	 */
+	public static void createTestDat() throws IOException{
+		 //write output to a file
 		 Configuration conf = new Configuration();
-	     Path outDir = new Path(TMP_DIR, "out");
-	     Path outFile = new Path(outDir, "reduce-out");
+	     Path outFile = new Path(outDir, "test-out.dat");
 	     FileSystem fileSys = FileSystem.get(conf);
 	     SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
 	         outFile, BytesWritable.class, ByteWritable.class, 
 	         CompressionType.NONE);
 	     
+
 		 //Generate <key,value>  <BytesWritable, ByteWritable>
 		 byte[] key = hexStringToByteArray("06"); //000110
 		 byte[] value = hexStringToByteArray("11"); //00010001
@@ -69,28 +75,177 @@
 	     for(int i = 0; i < arrayOfKeys.size(); i++)
 	    	 writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
 	     writer.close();
-	      
+	     
 	     //read outputs
-	     Path inFile = new Path(outDir, "reduce-out");
+	     Path inFile = new Path(outDir, "test-out.dat");
 	     BytesWritable outKey = new BytesWritable();
 	     ByteWritable outValue = new ByteWritable();
 	     SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
 	     try {
-	       reader.next(outKey, outValue);
+	        reader.next(outKey, outValue);
 		     System.out.println(outKey.getBytes());
 		     System.out.println(outValue.get());
 	     } finally {
 	       reader.close();
 	     }
+	}
+	
+    /**
+     * create a mergeTest SequenceFile
+     * CAG - AGC - GCG - CGT - GTA - TAT - ATA 
+     * GAG 								   ATC	
+     * 
+     * CAG	010010	00000010	
+     * AGC	001001	01100100	
+     * GCG	100110	00011000
+     * CGT	011011	01000001
+     * GTA	101100	00101000
+     * TAT	110011	01000011
+     * ATA	001100	10000000
+     * GAG	100010	00000010
+     * ATC	001101	10000000
+     */
+	public static void createMergeTest() throws IOException{
+		 //write output to a file
+		 Configuration conf = new Configuration();
+	     Path outFile = new Path(outDir, "mergeTest");
+	     FileSystem fileSys = FileSystem.get(conf);
+	     SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+	         outFile, BytesWritable.class, ByteWritable.class, 
+	         CompressionType.NONE);
+	     
+
+		 //Generate <key,value>  <BytesWritable, ByteWritable>
+	     // 1
+	     String tmpKey = "010010";
+		 byte[] key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+		 String tmpValue = "00000010";
+		 byte value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+		 BytesWritable keyWritable = new BytesWritable(key);
+		 ByteWritable valueWritable = new ByteWritable(value);
+	     
+	     ArrayList<BytesWritable> arrayOfKeys = new ArrayList<BytesWritable>();
+	     arrayOfKeys.add(keyWritable);
+	     ArrayList<ByteWritable> arrayOfValues = new ArrayList<ByteWritable>();
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 2
+	     tmpKey = "001001";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "01100100";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 3
+	     tmpKey = "100110";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "00011000";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 4
+	     tmpKey = "011011";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "01000001";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 5
+	     tmpKey = "101100";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "00101000";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 6
+	     tmpKey = "110011";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "01000011";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+
+	     // 7
+	     tmpKey = "001100";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "10000000";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 8
+	     tmpKey = "100010";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "00000010";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     // 9
+	     tmpKey = "001101";
+	     key = BitwiseOperation.convertBinaryStringToBytes(tmpKey);
+	     tmpValue = "10000000";
+	     value = BitwiseOperation.convertBinaryStringToByte(tmpValue);
+	     keyWritable = new BytesWritable(key);
+	     valueWritable = new ByteWritable(value);
+	     arrayOfKeys.add(keyWritable);
+	     arrayOfValues.add(valueWritable);
+	     
+	     //wirte to sequence file
+	     for(int i = 0; i < arrayOfKeys.size(); i++)
+	    	 writer.append(arrayOfKeys.get(i), arrayOfValues.get(i));
+	     writer.close();
+	     
+	     //read outputs
+	     Path inFile = new Path(outDir, "mergeTest");
+	     BytesWritable outKey = new BytesWritable();
+	     ByteWritable outValue = new ByteWritable();
+	     SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, conf);
+	     int iteration = 1;
+	     try {
+	         while(reader.next(outKey, outValue)){
+	        	 System.out.println(iteration);
+			     System.out.println("key: " + BitwiseOperation.convertBytesToBinaryStringKmer(outKey.getBytes(),k));
+			     System.out.println("value: " + BitwiseOperation.convertByteToBinaryString(outValue.get()));
+			     System.out.println();
+			     iteration++;
+	         }
+	     } finally {
+	       reader.close();
+	     }
+	}
+	
+	 public static void main(String[] argv) throws Exception {
+		 //createTestDat();
+		 createMergeTest();
+		 createTestDat();
 	 }
 	 
 	 public static byte[] hexStringToByteArray(String s) {
-		    int len = s.length();
-		    byte[] data = new byte[len / 2];
-		    for (int i = 0; i < len; i += 2) {
-		        data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
-		                             + Character.digit(s.charAt(i+1), 16));
-		    }
-		    return data;
-		}
+	    int len = s.length();
+	    byte[] data = new byte[len / 2];
+	    for (int i = 0; i < len; i += 2) {
+	        data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+	                             + Character.digit(s.charAt(i+1), 16));
+	    }
+	    return data;
+	}
 }
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
new file mode 100644
index 0000000..3e4db6e
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/bitwise/BitwiseOperation.java
@@ -0,0 +1,103 @@
+package edu.uci.ics.pregelix.bitwise;
+
+public class BitwiseOperation {
+	
+	public static byte[] shiftBitsLeft(byte[] bytes, final int leftShifts) {
+	   assert leftShifts >= 1 && leftShifts <= 7;
+
+	   byte[] resultBytes = new byte[bytes.length];
+	   for(int i = 0; i < bytes.length; i++)
+		   resultBytes[i] = bytes[i];
+	   final int rightShifts = 8 - leftShifts;
+
+	   byte previousByte = resultBytes[bytes.length - 1]; // keep the byte before modification
+	   resultBytes[resultBytes.length - 1] = (byte) (((resultBytes[resultBytes.length - 1] & 0xff) << leftShifts));
+	   for (int i = resultBytes.length - 2; i >= 0; i--) {
+	      byte tmp = resultBytes[i];
+	      resultBytes[i] = (byte) (((resultBytes[i] & 0xff) << leftShifts) | ((previousByte & 0xff) >> rightShifts));
+	      previousByte = tmp;
+	   }
+	   return resultBytes;
+	}
+	
+	public static byte[] shiftBitsRight(byte[] bytes, final int rightShifts) {
+	   assert rightShifts >= 1 && rightShifts <= 7;
+
+	   byte[] resultBytes = new byte[bytes.length];
+	   for(int i = 0; i < bytes.length; i++)
+		   resultBytes[i] = bytes[i];
+	   final int leftShifts = 8 - rightShifts;
+
+	   byte previousByte = resultBytes[0]; // keep the byte before modification
+	   resultBytes[0] = (byte) (((resultBytes[0] & 0xff) >> rightShifts));
+	   for (int i = 1; i < resultBytes.length; i++) {
+	      byte tmp = resultBytes[i];
+	      resultBytes[i] = (byte) (((resultBytes[i] & 0xff) >> rightShifts) | ((previousByte & 0xff) << leftShifts));
+	      previousByte = tmp;
+	   }
+	   return resultBytes;
+	}
+	
+	public static byte convertBinaryStringToByte(String input){
+	     int tmpInt = Integer.parseInt(input,2);
+	     byte tmpByte = (byte) tmpInt;
+	     return tmpByte;
+	}
+	
+	public static byte[] convertBinaryStringToBytes(String input){
+		input = complementString(input);
+		int numOfBytes = input.length() / 8;
+		byte[] bytes = new byte[numOfBytes];
+		for(int i = 0; i < numOfBytes; ++i)
+		{
+			bytes[i] = convertBinaryStringToByte(input.substring(8 * i, 8 * i + 8));
+		}
+		return bytes;
+	}
+	
+	public static String complementString(String input){
+		int remaining = input.length() % 8;
+		if(remaining == 0)
+			return input;
+		for(int i = 0; i < 8 - remaining; i++)
+			input += "0";
+		return input;
+	}
+	
+	public static String convertByteToBinaryString(byte b){
+		StringBuilder sb = new StringBuilder(Byte.SIZE);
+	    for( int i = 0; i < Byte.SIZE; i++ )
+	        sb.append((b << i % Byte.SIZE & 0x80) == 0 ? '0' : '1');
+	    return sb.toString();
+	}
+	
+	public static String convertBytesToBinaryString(byte[] bytes){
+		StringBuilder sb = new StringBuilder(bytes.length * Byte.SIZE);
+	    for( int i = 0; i < Byte.SIZE * bytes.length; i++ )
+	        sb.append((bytes[i / Byte.SIZE] << i % Byte.SIZE & 0x80) == 0 ? '0' : '1');
+	    return sb.toString();
+	}
+	
+	public static String convertBytesToBinaryStringKmer(byte[] bytes, int k){
+		StringBuilder sb = new StringBuilder(bytes.length * Byte.SIZE);
+	    for( int i = 0; i < Byte.SIZE * bytes.length; i++ )
+	        sb.append((bytes[i / Byte.SIZE] << i % Byte.SIZE & 0x80) == 0 ? '0' : '1');
+	    return sb.toString().substring(0,2*k);
+	}
+	
+	public static byte[] addLastTwoBits(byte[] bytes, int lengthOfChainVertex, String newBits){
+		String originalBytes = convertBytesToBinaryString(bytes).substring(0,2*lengthOfChainVertex);
+		originalBytes += newBits;
+		return convertBinaryStringToBytes(originalBytes);
+	}
+	
+	public static String getLastTwoBits(byte[] bytes, int k){
+		String sb = convertBytesToBinaryString(bytes);
+		int num = k / 4;
+		int extraBit = k % 4;
+		if(extraBit >= 1)
+			return sb.substring(8*num + 2*(extraBit-1),8*num + 2*extraBit);
+		else
+			return sb.substring(8*num - 2, 8*num);
+	}
+}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
index 0dbd800..eed352d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/example/io/MessageWritable.java
@@ -9,72 +9,126 @@
 
 public class MessageWritable implements WritableComparable<MessageWritable>{
 	/**
-	 * bytes stores the chains of connected DNA
+	 * sourceVertexId stores source vertexId when headVertex sends the message
+	 * 				  stores neighber vertexValue when pathVertex sends the message
+	 * chainVertexId stores the chains of connected DNA
 	 * file stores the point to the file that stores the chains of connected DNA
 	 */
-	private byte[] bytes;
+	private byte[] sourceVertexIdOrNeighberInfo;
+	private byte[] chainVertexId;
 	private File file;
+	private boolean isRear;
+	private int lengthOfChain;
+	private static int k = 3;
 	
 	public MessageWritable(){		
 	}
 	
-	public MessageWritable(byte[] bytes, File file){
-		set(bytes,file);
-	}
-	
-	public void set(byte[] bytes, File file){
-		this.bytes = bytes;
+	public void set(byte[] sourceVertexIdOrNeighberInfo, byte[] chainVertexId, File file){
+		this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+		this.chainVertexId = chainVertexId;
 		this.file = file;
+		this.isRear = false;
+		this.lengthOfChain = 0;
 	}
-			
-	public byte[] getBytes() {
-	    return bytes;
+
+	public byte[] getSourceVertexIdOrNeighberInfo() {
+		return sourceVertexIdOrNeighberInfo;
 	}
-	
-	public File getFile(){
+
+	public void setSourceVertexIdOrNeighberInfo(byte[] sourceVertexIdOrNeighberInfo) {
+		this.sourceVertexIdOrNeighberInfo = sourceVertexIdOrNeighberInfo;
+	}
+
+	public byte[] getChainVertexId() {
+		return chainVertexId;
+	}
+
+	public void setChainVertexId(byte[] chainVertexId) {
+		this.chainVertexId = chainVertexId;
+	}
+
+	public File getFile() {
 		return file;
 	}
 
+	public void setFile(File file) {
+		this.file = file;
+	}
+
+	public boolean isRear() {
+		return isRear;
+	}
+
+	public void setRear(boolean isRear) {
+		this.isRear = isRear;
+	}
+
+	public int getLengthOfChain() {
+		return lengthOfChain;
+	}
+
+	public void setLengthOfChain(int lengthOfChain) {
+		this.lengthOfChain = lengthOfChain;
+	}
+
+	public void incrementLength(){
+		this.lengthOfChain++;
+	}
+	
 	@Override
 	public void write(DataOutput out) throws IOException {
 		// TODO Auto-generated method stub
-		out.write(bytes);
-		out.writeUTF(file.getAbsolutePath()); 
+		out.writeInt(lengthOfChain);
+		if(lengthOfChain != 0)
+			out.write(chainVertexId);
+		out.write(sourceVertexIdOrNeighberInfo);
+		out.writeBoolean(isRear);
 	}
 
 	@Override
 	public void readFields(DataInput in) throws IOException {
 		// TODO Auto-generated method stub
-		in.readFully(bytes);
-		String absolutePath = in.readUTF();
-		file = new File(absolutePath);
+		lengthOfChain = in.readInt();
+		if(lengthOfChain != 0){
+			chainVertexId = new byte[(lengthOfChain-1)/4 + 1];
+			in.readFully(chainVertexId);
+		}
+		else
+			chainVertexId = new byte[0];
+		if(lengthOfChain % 2 == 0)
+			sourceVertexIdOrNeighberInfo = new byte[(k-1)/4 + 1];
+		else
+			sourceVertexIdOrNeighberInfo = new byte[1];
+		in.readFully(sourceVertexIdOrNeighberInfo);
+		isRear = in.readBoolean();
 	}
 
     @Override
     public int hashCode() {
     	int hashCode = 0;
-    	for(int i = 0; i < bytes.length; i++)
-    		hashCode = (int)bytes[i];
+    	for(int i = 0; i < chainVertexId.length; i++)
+    		hashCode = (int)chainVertexId[i];
         return hashCode;
     }
     @Override
     public boolean equals(Object o) {
         if (o instanceof MessageWritable) {
         	MessageWritable tp = (MessageWritable) o;
-            return bytes == tp.bytes && file == tp.file;
+            return chainVertexId == tp.chainVertexId && file == tp.file;
         }
         return false;
     }
     @Override
     public String toString() {
-        return bytes.toString() + "\t" + file.getAbsolutePath();
+        return chainVertexId.toString() + "\t" + file.getAbsolutePath();
     }
     
 	@Override
 	public int compareTo(MessageWritable tp) {
 		// TODO Auto-generated method stub
         int cmp;
-        if (bytes == tp.bytes)
+        if (chainVertexId == tp.chainVertexId)
             cmp = 0;
         else
             cmp = 1;
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/hdfs/HDFSOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/hdfs/HDFSOperation.java
new file mode 100644
index 0000000..ea972ef
--- /dev/null
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/hdfs/HDFSOperation.java
@@ -0,0 +1,123 @@
+package edu.uci.ics.pregelix.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HDFSOperation {
+
+	private static Configuration conf;
+	private static FileSystem hdfs;
+	private static Path path;
+	
+	public HDFSOperation() throws IOException{
+		conf = new Configuration();
+		hdfs = FileSystem.get(conf);
+		path = null;
+	}
+	
+	public static boolean insertHDFSFile(String fileName, int length, byte[] buffer) throws IOException{
+		path = new Path(fileName);
+		if (!hdfs.exists(path))
+			createHDFSFile(fileName,length,buffer);
+		else
+			appendHDFSFile(fileName,length,buffer);
+		return true;
+	}
+	
+	public static boolean createHDFSFile(String fileName, int length, byte[] buffer) throws IOException{
+		path = new Path(fileName);
+		if (hdfs.exists(path)){
+			System.out.println("Output already exists");
+		    return false;
+		}
+		/*if (!hdfs.isFile(path)){
+			System.out.println("Output should be a file");
+		    return false;
+		}*/
+		FSDataOutputStream outputStream = hdfs.create(path);
+		outputStream.writeInt(length);
+		outputStream.write(buffer); 
+		outputStream.close();
+		return true;
+	}
+	
+	public static boolean appendHDFSFile(String fileName, int length, byte[] buffer) throws IOException{
+		path = new Path(fileName);
+		if (!hdfs.exists(path)){
+			System.out.println("Output not found");
+		    return false;
+		}
+		if (!hdfs.isFile(path)){
+			System.out.println("Output should be a file");
+		    return false;
+		}
+		FSDataOutputStream outputStream = hdfs.append(path);
+		outputStream.writeInt(length);
+		outputStream.write(buffer); 
+		outputStream.close();
+		return true;
+	}
+	
+	public static boolean deleteHDFSFile(String fileName) throws IOException{
+		path = new Path(fileName);
+		if (!hdfs.exists(path)){
+			System.out.println("Input file not found");
+			return false;
+		}
+		if (!hdfs.isFile(path)){
+			System.out.println("Input should be a file");
+		    return false;
+		}
+		return hdfs.delete(path,true);
+	}
+	
+	public static boolean copyFromLocalFile(String srcFile, String dstFile) throws IOException{
+		Path srcPath = new Path(srcFile);
+		path = new Path(dstFile);
+		if (!hdfs.exists(path)){
+			System.out.println("Input file not found");
+			return false;
+		}
+		if (!hdfs.isFile(path)){
+			System.out.println("Input should be a file");
+		    return false;
+		}
+		hdfs.copyFromLocalFile(srcPath, path);
+		return true;
+	}
+	
+	public static void testReadAndWriteHDFS() throws Exception{
+		String inFileName = "testHDFS/testInput";
+		String outFileName = "testHDFS/testOutput";
+		Configuration conf = new Configuration();
+		FileSystem fs = FileSystem.get(conf);
+		Path inFile = new Path(inFileName);
+		Path outFile = new Path(outFileName);
+		if (!fs.exists(inFile)){
+			System.out.println("Input file not found");
+			return;
+		}
+		if (!fs.isFile(inFile)){
+			System.out.println("Input should be a file");
+		    return;
+		}
+		if (fs.exists(outFile)){
+			System.out.println("Output already exists");
+		    return;
+		}
+		FSDataInputStream in = fs.open(inFile);
+		FSDataOutputStream out = fs.create(outFile);
+		byte[] buffer = new byte[1024];
+		int bytesRead = 0;
+		while ((bytesRead = in.read(buffer)) > 0) {
+			out.write(buffer, 0, bytesRead);
+		}
+		in.close();
+		out.close();
+	}
+}