update

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3262 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
index 5b75e05..8197ea8 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/GraphVertexOperation.java
@@ -305,9 +305,9 @@
 	/**
 	 * merge two BytesWritable. Ex. merge two vertexId
 	 */
-	public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2){
-		String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*length);
-		return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), s2.length()/2);
+	public static byte[] mergeTwoChainVertex(byte[] b1, int length, byte[] b2, int length2){
+		String s2 = BitwiseOperation.convertBytesToBinaryString(b2).substring(2*k-2,2*length2);
+		return BitwiseOperation.mergeTwoBytesArray(b1, length, BitwiseOperation.convertBinaryStringToBytes(s2), length2-k+1);
 	}
 	/**
 	 * update right neighber
@@ -597,6 +597,21 @@
     	return;
 	}
 	/**
+	 *  test voteToHalt vertex information
+	 */
+	public static void testVoteVertexInfo(OutputStreamWriter writer, long step, byte[] vertexId, String reason){
+		try {
+    		writer.write("Step: " + step + "\r\n");
+			writer.write(reason + "\r\n");
+			writer.write("voteToHalt " + BitwiseOperation.convertBytesToBinaryStringKmer(vertexId, GraphVertexOperation.k) 
+				 + "\t" + GraphVertexOperation.convertBinaryStringToGenecode(
+		    				BitwiseOperation.convertBytesToBinaryStringKmer(
+		    						vertexId,GraphVertexOperation.k)) + "\r\n");
+			writer.write("\r\n");
+    	} catch (IOException e) { e.printStackTrace(); }
+    	return;
+	}
+	/**
 	 *  test merge chain vertex
 	 */
 	public static void testMergeChainVertex(OutputStreamWriter writer, long step, byte[] mergeChain,
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
index 5a4a47c..c341e0d 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/LogAlgorithmForMergeGraphVertex.java
@@ -138,28 +138,11 @@
 			tmpVertexValue = tmpVal.getValue();
 			tmpVertexState = tmpVal.getState();
 			tmpSourceVertextId = getVertexId().getBytes();
-			if(!GraphVertexOperation.isHead(new ByteWritable(tmpVertexValue))
-					&& !GraphVertexOperation.isRear(new ByteWritable(tmpVertexValue))){
-				// compute the right neighber - destination vertex Id
-				if(msgIterator.hasNext()){
-					tmpMsg = msgIterator.next();
-					tmpLengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
-					tmpMergeChainVertexId = tmpMsg.getChainVertexId();
-					byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId, 
-							tmpLengthOfMergeChainVertex);
-					tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, 
-							tmpMsg.getNeighberInfo());
-				}
-				else{
-					tmpMsg = new LogAlgorithmMessageWritable();
-					tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
-							getVertexValue().getValue());
-				}
-				
-				/*if(tmpMsg.getLengthOfChain()== 0){
-					tmpMsg.setLengthOfChain(GraphVertexOperation.k);
-					tmpMsg.setChainVertexId(getVertexId().getBytes());
-				}*/
+
+			if(getSuperstep() == 3){
+				tmpMsg = new LogAlgorithmMessageWritable();
+				tmpDestVertexId = GraphVertexOperation.getDestVertexId(tmpSourceVertextId, 
+						getVertexValue().getValue());
 				if(tmpVertexState == State.START_VERTEX){
 					tmpMsg.setMessage(Message.START);
 					tmpMsg.setSourceVertexId(tmpSourceVertextId);
@@ -177,6 +160,35 @@
 							tmpDestVertexId, tmpMsg, null);
 				}
 			}
+			else{
+				if(msgIterator.hasNext()){
+					tmpMsg = msgIterator.next();
+					tmpLengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain();
+					tmpMergeChainVertexId = tmpVal.getMergeChain();
+					byte[] lastKmer = GraphVertexOperation.getLastKmer(tmpMergeChainVertexId, 
+							tmpLengthOfMergeChainVertex);
+					tmpDestVertexId = GraphVertexOperation.getDestVertexId(lastKmer, 
+							tmpMsg.getNeighberInfo());
+					if(tmpVertexState == State.START_VERTEX){
+						tmpMsg.setMessage(Message.START);
+						tmpMsg.setSourceVertexId(tmpSourceVertextId);
+						sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+						//test
+						GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+								tmpDestVertexId, tmpMsg, null);
+					}
+					else if(tmpVertexState != State.END_VERTEX){
+						tmpMsg.setMessage(Message.NON);
+						tmpMsg.setSourceVertexId(tmpSourceVertextId);
+						sendMsg(new BytesWritable(tmpDestVertexId),tmpMsg);
+						//test
+						GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+								tmpDestVertexId, tmpMsg, null);
+					}
+				}
+				else
+					voteToHalt();
+			}
 		}
 		//path node sends message back to head node
 		else if(getSuperstep()%3 == 1){
@@ -185,17 +197,13 @@
 				tmpMsg = msgIterator.next();
 				tmpMessage = tmpMsg.getMessage();
 				tmpSourceVertextId = tmpMsg.getSourceVertexId();
-				tmpChainVertexId = tmpMsg.getChainVertexId();
 				if(tmpVal.getLengthOfMergeChain() == 0){
-					tmpMsg.setSourceVertexId(getVertexId().getBytes());
 					tmpVal.setLengthOfMergeChain(GraphVertexOperation.k);
 					tmpVal.setMergeChain(getVertexId().getBytes());
 					setVertexValue(tmpVal);
 				}
-				else{
-					tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
-					tmpMsg.setSourceVertexId(tmpVal.getMergeChain());
-				}
+				tmpMsg.setLengthOfChain(tmpVal.getLengthOfMergeChain());
+				tmpMsg.setChainVertexId(tmpVal.getMergeChain());
 				
 				tmpMsg.setNeighberInfo(tmpVal.getValue()); //set neighber
 				tmpMsg.setSourceVertexState(tmpVal.getState());
@@ -215,10 +223,10 @@
 				}
 			}
 			else{
-				if(!GraphVertexOperation.isHead(new ByteWritable(getVertexValue().getValue()))
-						&& !GraphVertexOperation.isRear(new ByteWritable(getVertexValue().getValue()))
-						&& getVertexValue().getState() != State.START_VERTEX){
-
+				if(getVertexValue().getState() != State.START_VERTEX
+						&& getVertexValue().getState() != State.END_VERTEX
+						&& tmpMessage != Message.END && tmpMessage != Message.START){
+					
 					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(), "not receive any message");
 					deleteVertex(getVertexId()); //killSelf because it doesn't receive any message
 				}
@@ -228,28 +236,34 @@
 			if(msgIterator.hasNext()){
 				tmpMsg = msgIterator.next();
 				tmpVal = getVertexValue();
-				if(tmpMsg.getMessage() == Message.END)
-					tmpVertexState = State.END_VERTEX;
-				else
-					tmpVertexState = getVertexValue().getState();
-				tmpVal.setState(tmpVertexState);
+				tmpVertexState = tmpVal.getState();
 				tmpSourceVertextId = getVertexId().getBytes();
 				if(tmpVertexState == State.TODELETE){
-					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(), tmpSourceVertextId, "already merged by head");
+					GraphVertexOperation.testDeleteVertexInfo(writer, getSuperstep(),
+							tmpSourceVertextId, "already merged by head");
 					deleteVertex(new BytesWritable(tmpSourceVertextId)); //killSelf
 				}
 				else{
+					if(tmpMsg.getMessage() == Message.END){
+						if(tmpVertexState != State.START_VERTEX)
+							tmpVertexState = State.END_VERTEX;
+						else
+							tmpVertexState = State.FINAL_VERTEX;
+					}
+						
+					tmpVal.setState(tmpVertexState);
 					if(getSuperstep() == 5){
 						lengthOfMergeChainVertex = GraphVertexOperation.k;
 						mergeChainVertexId = getVertexId().getBytes();
 					}
 					else{
-						lengthOfMergeChainVertex = tmpMsg.getLengthOfChain();
-						mergeChainVertexId = tmpMsg.getChainVertexId();
+						lengthOfMergeChainVertex = tmpVal.getLengthOfMergeChain(); 
+						mergeChainVertexId = tmpVal.getMergeChain(); 
 					}
-					mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId, lengthOfMergeChainVertex,
-							tmpMsg.getSourceVertexId());
-					lengthOfMergeChainVertex = 2*lengthOfMergeChainVertex - GraphVertexOperation.k + 1;
+					mergeChainVertexId = GraphVertexOperation.mergeTwoChainVertex(mergeChainVertexId,
+							lengthOfMergeChainVertex, tmpMsg.getChainVertexId(), tmpMsg.getLengthOfChain()); //tmpMsg.getSourceVertexId()
+					lengthOfMergeChainVertex = lengthOfMergeChainVertex + tmpMsg.getLengthOfChain()
+							- GraphVertexOperation.k + 1;
 					tmpVal.setLengthOfMergeChain(lengthOfMergeChainVertex);
 					tmpVal.setMergeChain(mergeChainVertexId);
 					
@@ -259,17 +273,31 @@
 					byte tmpByte = tmpMsg.getNeighberInfo();
 					tmpVertexValue = GraphVertexOperation.updateRightNeighber(getVertexValue().getValue(),tmpByte);
 					tmpVal.setValue(tmpVertexValue);
-					setVertexValue(tmpVal);
-					tmpMsg.setNeighberInfo(tmpVertexValue);
-					tmpMsg.setLengthOfChain(lengthOfMergeChainVertex);
-					tmpMsg.setChainVertexId(mergeChainVertexId);
-					sendMsg(getVertexId(),tmpMsg);
-					//test
-					GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
-							getVertexId().getBytes(), tmpMsg, null);
+					if(tmpVertexState != State.FINAL_VERTEX){
+						setVertexValue(tmpVal);
+						tmpMsg = new LogAlgorithmMessageWritable();
+						tmpMsg.setNeighberInfo(tmpVertexValue);
+						sendMsg(getVertexId(),tmpMsg);
+						//test
+						GraphVertexOperation.testMessageCommunication2(writer, getSuperstep(), getVertexId().getBytes(),
+								getVertexId().getBytes(), tmpMsg, null);
+					}
 				}
-				if(tmpVertexState == State.END_VERTEX)
+				if(tmpVertexState == State.END_VERTEX){
 					voteToHalt();
+					//test
+					GraphVertexOperation.testVoteVertexInfo(writer, getSuperstep(), getVertexId().getBytes(),
+							" it is the rear!");
+				}
+				if(tmpVertexState == State.FINAL_VERTEX){
+					voteToHalt();
+					try {
+						GraphVertexOperation.flushChainToFile(tmpVal.getMergeChain(), 
+								tmpVal.getLengthOfMergeChain(),getVertexId().getBytes());
+			    		writer.write("Step: " + getSuperstep() + "\r\n");
+			    		writer.write("Flush! " + "\r\n");
+					} catch (IOException e) { e.printStackTrace(); }
+				}
 			}
 		}
 		try {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
index 891fb8d..e2f7b19 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/SequenceFile/GenerateSequenceFile.java
@@ -372,8 +372,8 @@
 		 //createTestDat();
 		/* Path dir = new Path("data/webmap");
 		 Path inFile = new Path(dir, "part-1");
-		 Path outFile = new Path(dir, "part-1-out");
-		 generateNumOfLinesFromBigFile(inFile,outFile,10000);*/
+		 Path outFile = new Path(dir, "part-1-out-100");
+		 generateNumOfLinesFromBigFile(inFile,outFile,100);*/
 		 /**
 		  * AGC - A		C - TAT
 		  *  AGCATGCTAT
@@ -406,13 +406,15 @@
 		  *  
 		  *  Two strings 
 		  *  "AGCATGCTAT","TTCAGTACCCGC"
+		  *  
+		  *  AGCATGCTAT
 		  */ 
 		 
-		 generateSequenceFileFromGeneCode3("AGCATGCTAT");//GTCGATT  //before T: GGACG
+		 generateSequenceFileFromGeneCode3("AGCATGGCCTGCTAT");//GTCGATT  //before T: GGACG
 	 }
 	 public static void generateSequenceFileFromGeneCode3(String s) throws IOException{
 		 Configuration conf = new Configuration();
-	     Path outFile = new Path(outDir, "sequenceShortFileMergeTest");
+	     Path outFile = new Path(outDir, "11");//sequenceShortFileMergeTest
 	     FileSystem fileSys = FileSystem.get(conf);
 	     SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
 	         outFile, BytesWritable.class, KmerCountValue.class, 
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
index 3d97044..5ccaae5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/TestLoadGraphVertex.java
@@ -44,16 +44,16 @@
  */
 public class TestLoadGraphVertex extends Vertex<BytesWritable, ByteWritable, NullWritable, MessageWritable>{
 
-	private byte[] tmpVertexId;
-	private BytesWritable vid;
-	private TestLoadGraphVertex newVertex;
-	private MessageWritable tmpMsg = new MessageWritable();
+	//private byte[] tmpVertexId;
+	//private BytesWritable vid;
+	//private TestLoadGraphVertex newVertex;
+	//private MessageWritable tmpMsg = new MessageWritable();
 	/**
 	 * For test, just output original file
 	 */
 	@Override
 	public void compute(Iterator<MessageWritable> msgIterator) {
-		deleteVertex(getVertexId());
+		/*deleteVertex(getVertexId());
 		tmpVertexId = getVertexId().getBytes();
 		String a1 = "100100";
 		byte[] b1 = BitwiseOperation.convertBinaryStringToBytes(a1);
@@ -62,15 +62,15 @@
 		String valueString = "00000000";
 		byte value = BitwiseOperation.convertBinaryStringToByte(valueString);
 		if(getSuperstep() == 1 && Arrays.equals(b1,tmpVertexId)){
-			/*newVertex = new TestLoadGraphVertex();
+			newVertex = new TestLoadGraphVertex();
             vid.set(new BytesWritable(b2));
             newVertex.setVertexId(vid);
             newVertex.setVertexValue(getVertexValue());
-            addVertex(vid, newVertex);*/
+            addVertex(vid, newVertex);
 			//vertex.initialize(new BytesWritable(b2), new ByteWritable(value), null, null);
 			//addVertex(new BytesWritable(b2),this.createdNewLiveVertex());
 			deleteVertex(getVertexId());
-		}
+		}*/
 		/*String a2 = "100111";
 		byte[] b2 = BitwiseOperation.convertBinaryStringToBytes(a2);
 		String a3 = "11111111";
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
index a89add1..1900eca 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/KmerCountValue.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.pregelix.type;
 
 import java.io.DataInput;
@@ -20,22 +34,6 @@
 		count = 0;
 	}
 
-	public byte getAdjBitMap() {
-		return adjBitMap;
-	}
-
-	public void setAdjBitMap(byte adjBitMap) {
-		this.adjBitMap = adjBitMap;
-	}
-
-	public byte getCount() {
-		return count;
-	}
-
-	public void setCount(byte count) {
-		this.count = count;
-	}
-
 	@Override
 	public void readFields(DataInput arg0) throws IOException {
 		adjBitMap = arg0.readByte();
@@ -57,5 +55,19 @@
 		this.adjBitMap = bitmap;
 		this.count = count;
 	}
+	public byte getAdjBitMap() {
+        return adjBitMap;
+    }
+	
+	public void setAdjBitMap(byte adjBitMap) {
+		this.adjBitMap = adjBitMap;
+	}
 
+	public void setCount(byte count) {
+		this.count = count;
+	}
+
+	public byte getCount() {
+	    return count;
+	}
 }
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
index 61ee2fa..c62688e 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/pregelix/type/State.java
@@ -6,4 +6,5 @@
 	public static final int END_VERTEX = 2;
 	public static final int MID_VERTEX = 3;
 	public static final int TODELETE = 4;
+	public static final int FINAL_VERTEX = 5;
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
index 7ac4093..7affcbf 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobGen/JobGenerator.java
@@ -47,7 +47,7 @@
     
     private static void generateBinaryLoadGraphJob(String jobName, String outputPath) throws IOException {
     	PregelixJob job = new PregelixJob(jobName);
-    	job.setVertexClass(MergeGraphVertex.class);
+    	job.setVertexClass(TestLoadGraphVertex.class);
     	job.setVertexInputFormatClass(BinaryLoadGraphInputFormat.class);
         job.setVertexOutputFormatClass(BinaryLoadGraphOutputFormat.class);
         job.setOutputKeyClass(BytesWritable.class);
@@ -84,8 +84,8 @@
 	 */
 	public static void main(String[] args) throws IOException {
 		// TODO Auto-generated method stub
-		genLogAlgorithmForMergeGraph();
-		//genBinaryLoadGraph();
+		//genLogAlgorithmForMergeGraph();
+		genBinaryLoadGraph();
 		//genSequenceLoadGraph();
 		//genBasicBinaryLoadGraph();
 	}
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
index f347501..58e4ea3 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/pregelix/JobRun/RunJobTestSuite.java
@@ -40,7 +40,7 @@
 	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
 	private static final String FILE_EXTENSION_OF_RESULTS = "result";
 
-	private static final String DATA_PATH = "data/webmap/sequenceShortFileMergeTest";//sequenceFileMergeTest
+	private static final String DATA_PATH = "data/webmap/part-1-out";//sequenceFileMergeTest
 	private static final String HDFS_PATH = "/webmap/";
 	
 	private static final String HYRACKS_APP_NAME = "pregelix";