SimpleSplitRepeat is completed. An issue in graph building(hadoop): KmerList.appendList() doesn't check existed element
diff --git a/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
new file mode 100644
index 0000000..f2e3942
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/AdjSplitRepeat.txt
@@ -0,0 +1,3 @@
+1	AATAG
+2	GCATA
+3	ATAGC
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
index 1633c26..687474b 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -36,7 +36,7 @@
 		while (values.hasNext()) {
 		    tmpNode.set(values.next());
 		    outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
-		    outputNode.getFFList().appendList(tmpNode.getFFList());
+		    outputNode.getFFList().appendList(tmpNode.getFFList()); //appendList need to check if insert node exists
 		    outputNode.getFRList().appendList(tmpNode.getFRList());
 		    outputNode.getRFList().appendList(tmpNode.getRFList());
 		    outputNode.getRRList().appendList(tmpNode.getRRList());
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
index 127ab3e..ca7f67d 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -22,7 +22,7 @@
     private JobConf conf = new JobConf();
     private static final String ACTUAL_RESULT_DIR = "actual";
     private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private static final String DATA_PATH = "data/webmap/SplitRepeat.txt";
+    private static final String DATA_PATH = "data/webmap/AdjSplitRepeat.txt";
     private static final String HDFS_PATH = "/webmap";
     private static final String RESULT_PATH = "/result";
     
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
new file mode 100755
index 0000000..a983577
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/1/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000 b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
new file mode 100755
index 0000000..a187c64
--- /dev/null
+++ b/genomix/genomix-pregelix/data/SplitRepeat/AdjSplitRepeat/part-00000
Binary files differ
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
index 15dd0d8..3d87c60 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/splitrepeat/SplitRepeatVertex.java
@@ -6,8 +6,10 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.TreeMap;
 
+import edu.uci.ics.genomix.pregelix.client.Client;
+import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
+import edu.uci.ics.genomix.pregelix.format.InitialGraphCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.operator.pathmerge.BasicGraphCleanVertex;
@@ -16,100 +18,64 @@
 import edu.uci.ics.genomix.type.KmerListWritable;
 import edu.uci.ics.genomix.type.PositionWritable;
 import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 
 public class SplitRepeatVertex extends 
     BasicGraphCleanVertex{
     
-    public class CreatedVertex{
-        KmerBytesWritable createdVertexId;
-        String incomingDir;
-        String outgoingDir;
-        KmerBytesWritable incomingEdge;
-        KmerBytesWritable outgoingEdge;
+    public class EdgeDir{
+        public static final byte DIR_FF = 0 << 0;
+        public static final byte DIR_FR = 1 << 0;
+        public static final byte DIR_RF = 2 << 0;
+        public static final byte DIR_RR = 3 << 0;
+    }
+    
+    public class DeletedEdge{
+        private byte dir;
+        private KmerBytesWritable edge;
         
-        public CreatedVertex(){
-            createdVertexId = new KmerBytesWritable(kmerSize);
-            incomingDir = "";
-            outgoingDir = "";
-            incomingEdge = new KmerBytesWritable(kmerSize);
-            outgoingEdge = new KmerBytesWritable(kmerSize);
-        }
-        
-        public void clear(){
-            createdVertexId.reset(kmerSize);
-            incomingDir = "";
-            outgoingDir = "";
-            incomingEdge.reset(kmerSize);
-            outgoingEdge.reset(kmerSize);
-        }
-        
-        public KmerBytesWritable getCreatedVertexId() {
-            return createdVertexId;
+        public DeletedEdge(){
+            dir = 0;
+            edge = new KmerBytesWritable(kmerSize);
         }
 
-        public void setCreatedVertexId(KmerBytesWritable createdVertexId) {
-            this.createdVertexId = createdVertexId;
+        public byte getDir() {
+            return dir;
         }
 
-        public String getIncomingDir() {
-            return incomingDir;
+        public void setDir(byte dir) {
+            this.dir = dir;
         }
 
-        public void setIncomingDir(String incomingDir) {
-            this.incomingDir = incomingDir;
+        public KmerBytesWritable getEdge() {
+            return edge;
         }
 
-        public String getOutgoingDir() {
-            return outgoingDir;
-        }
-
-        public void setOutgoingDir(String outgoingDir) {
-            this.outgoingDir = outgoingDir;
-        }
-
-        public KmerBytesWritable getIncomingEdge() {
-            return incomingEdge;
-        }
-
-        public void setIncomingEdge(KmerBytesWritable incomingEdge) {
-            this.incomingEdge.set(incomingEdge);
-        }
-
-        public KmerBytesWritable getOutgoingEdge() {
-            return outgoingEdge;
-        }
-
-        public void setOutgoingEdge(KmerBytesWritable outgoingEdge) {
-            this.outgoingEdge.set(outgoingEdge);
+        public void setEdge(KmerBytesWritable edge) {
+            this.edge.set(edge);
         }
     }
     
-    private String[][] connectedTable = new String[][]{
-            {"FF", "RF"},
-            {"FF", "RR"},
-            {"FR", "RF"},
-            {"FR", "RR"}
+    private byte[][] connectedTable = new byte[][]{
+            {EdgeDir.DIR_RF, EdgeDir.DIR_FF},
+            {EdgeDir.DIR_RF, EdgeDir.DIR_FR},
+            {EdgeDir.DIR_RR, EdgeDir.DIR_FF},
+            {EdgeDir.DIR_RR, EdgeDir.DIR_FR}
     };
     public static Set<String> existKmerString = new HashSet<String>();
     private Set<Long> readIdSet;
     private Set<Long> incomingReadIdSet = new HashSet<Long>();
     private Set<Long> outgoingReadIdSet = new HashSet<Long>();
     private Set<Long> selfReadIdSet = new HashSet<Long>();
-    private Set<Long> incomingEdgeIntersection = new HashSet<Long>();
-    private Set<Long> outgoingEdgeIntersection = new HashSet<Long>();
     private Set<Long> neighborEdgeIntersection = new HashSet<Long>();
     private Map<KmerBytesWritable, Set<Long>> kmerMap = new HashMap<KmerBytesWritable, Set<Long>>();
-    private KmerBytesWritable incomingEdge = null;
-    private KmerBytesWritable outgoingEdge = null;
     private KmerListWritable incomingEdgeList = null; 
     private KmerListWritable outgoingEdgeList = null; 
     private byte incomingEdgeDir = 0;
     private byte outgoingEdgeDir = 0;
     
     protected KmerBytesWritable createdVertexId = null;  
-    private CreatedVertex createdVertex = new CreatedVertex();
-    public static Set<CreatedVertex> createdVertexSet = new HashSet<CreatedVertex>();
     
     /**
      * initiate kmerSize, maxIteration
@@ -130,11 +96,7 @@
         if(outgoingEdgeList == null)
             outgoingEdgeList = new KmerListWritable(kmerSize);
         if(createdVertexId == null)
-            createdVertexId = new KmerBytesWritable(kmerSize + 1);
-        if(incomingEdge == null)
-            incomingEdge = new KmerBytesWritable(kmerSize);
-        if(outgoingEdge == null)
-            outgoingEdge = new KmerBytesWritable(kmerSize);
+            createdVertexId = new KmerBytesWritable(kmerSize);//kmerSize + 1
     }
     
     /**
@@ -156,7 +118,184 @@
         return sb.toString();
     }
     
+    /**
+     * GenerateString only for test
+     */
+    public String generateString(){
+        if(existKmerString.isEmpty()){
+            existKmerString.add("AAA");
+            return "AAA";
+        }
+        else
+            return "GGG";
+    }
+    
+    public void generateKmerMap(Iterator<MessageWritable> msgIterator){
+        kmerMap.clear();
+        while(msgIterator.hasNext()){
+            incomingMsg = msgIterator.next();
+            readIdSet = new HashSet<Long>();
+            for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
+                readIdSet.add(nodeId.getReadId());
+            }
+            kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
+        }
+    }
+    
+    public void setSelfReadIdSet(){
+        selfReadIdSet.clear();
+        for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
+            selfReadIdSet.add(nodeId.getReadId());
+        }    
+    }
+    
     @SuppressWarnings({ "rawtypes", "unchecked" })
+    public void createNewVertex(int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+        Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+        KmerBytesWritable vertexId = new KmerBytesWritable(kmerSize);
+        VertexValueWritable vertexValue = new VertexValueWritable(kmerSize);
+        //add the corresponding edge to new vertex
+        switch(connectedTable[i][0]){
+            case EdgeDir.DIR_RF:
+                vertexValue.getRFList().append(incomingEdge);
+                break;
+            case EdgeDir.DIR_RR:
+                vertexValue.getRRList().append(incomingEdge);
+                break;
+        }
+        switch(connectedTable[i][1]){
+            case EdgeDir.DIR_FF:
+                vertexValue.getFFList().append(outgoingEdge);
+                break;
+            case EdgeDir.DIR_FR:
+                vertexValue.getFRList().append(outgoingEdge);
+                break;
+        }
+        vertexId.set(createdVertexId);
+        vertex.setVertexId(vertexId);
+        vertex.setVertexValue(vertexValue);
+        
+        addVertex(vertexId, vertex);
+    }
+    
+    public void sendMsgToUpdateEdge(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+        outgoingMsg.setCreatedVertexId(createdVertexId);
+        outgoingMsg.setSourceVertexId(getVertexId());
+        
+        outgoingMsg.setFlag(incomingEdgeDir);
+        destVertexId.set(incomingEdge);
+        sendMsg(destVertexId, outgoingMsg);
+        
+        outgoingMsg.setFlag(outgoingEdgeDir);
+        destVertexId.set(outgoingEdge);
+        sendMsg(destVertexId, outgoingMsg);
+    }
+    
+    public void storeDeletedEdge(Set<DeletedEdge> deletedEdges, int i, KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+        DeletedEdge deletedIncomingEdge = new DeletedEdge();
+        DeletedEdge deletedOutgoingEdge = new DeletedEdge();
+        switch(connectedTable[i][0]){
+            case EdgeDir.DIR_RF:
+                deletedIncomingEdge.setDir(EdgeDir.DIR_RF);
+                deletedIncomingEdge.setEdge(incomingEdge);
+                break;
+            case EdgeDir.DIR_RR:
+                deletedIncomingEdge.setDir(EdgeDir.DIR_RR);
+                deletedIncomingEdge.setEdge(incomingEdge);
+                break;
+        }
+        switch(connectedTable[i][1]){
+            case EdgeDir.DIR_FF:
+                deletedOutgoingEdge.setDir(EdgeDir.DIR_FF);
+                deletedOutgoingEdge.setEdge(outgoingEdge);
+                break;
+            case EdgeDir.DIR_FR:
+                deletedOutgoingEdge.setDir(EdgeDir.DIR_FR);
+                deletedOutgoingEdge.setEdge(outgoingEdge);
+                break;
+        }
+        deletedEdges.add(deletedIncomingEdge);
+        deletedEdges.add(deletedOutgoingEdge);
+    }
+    public void deleteEdgeFromOldVertex(DeletedEdge deleteEdge){
+        switch(deleteEdge.dir){
+            case EdgeDir.DIR_RF:
+                getVertexValue().getRFList().remove(deleteEdge.getEdge());
+                break;
+            case EdgeDir.DIR_RR:
+                getVertexValue().getRRList().remove(deleteEdge.getEdge());
+                break;
+            case EdgeDir.DIR_FF:
+                getVertexValue().getFFList().remove(deleteEdge.getEdge());
+                break;
+            case EdgeDir.DIR_FR:
+                getVertexValue().getFRList().remove(deleteEdge.getEdge());
+                break;
+        }
+    }
+    
+    public void setEdgeListAndEdgeDir(int i){
+        switch(connectedTable[i][0]){
+            case EdgeDir.DIR_RF:
+                incomingEdgeList.set(getVertexValue().getRFList());
+                incomingEdgeDir = MessageFlag.DIR_RF;
+                break;
+            case EdgeDir.DIR_RR:
+                incomingEdgeList.set(getVertexValue().getRRList());
+                incomingEdgeDir = MessageFlag.DIR_RR;
+                break;
+        }
+        switch(connectedTable[i][1]){
+            case EdgeDir.DIR_FF:
+                outgoingEdgeList.set(getVertexValue().getFFList());
+                outgoingEdgeDir = MessageFlag.DIR_FF;
+                break;
+            case EdgeDir.DIR_FR:
+                outgoingEdgeList.set(getVertexValue().getFRList());
+                outgoingEdgeDir = MessageFlag.DIR_FR;
+                break;
+        }
+    }
+    
+    public void setNeighborEdgeIntersection(KmerBytesWritable incomingEdge, KmerBytesWritable outgoingEdge){
+        outgoingReadIdSet.clear(); 
+        incomingReadIdSet.clear();
+        tmpKmer.set(incomingEdge);
+        incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
+        tmpKmer.set(outgoingEdge);
+        outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
+        
+        //set all neighberEdge readId intersection
+        neighborEdgeIntersection.addAll(selfReadIdSet);
+        neighborEdgeIntersection.retainAll(incomingReadIdSet);
+        neighborEdgeIntersection.retainAll(outgoingReadIdSet);
+    }
+    
+    public void updateEdgeListPointToNewVertex(){
+        byte meToNeighborDir = incomingMsg.getFlag();
+        byte neighborToMeDir = mirrorDirection(meToNeighborDir);
+        switch(neighborToMeDir){
+            case MessageFlag.DIR_FF:
+                getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
+                getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
+                break;
+            case MessageFlag.DIR_FR:
+                getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
+                getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
+                break;
+            case MessageFlag.DIR_RF:
+                getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
+                getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
+                break;
+            case MessageFlag.DIR_RR:
+                getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
+                getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
+                break;
+        }
+    }
+    
     @Override
     public void compute(Iterator<MessageWritable> msgIterator) {
         initVertex();
@@ -175,162 +314,103 @@
             }
             voteToHalt();
         } else if(getSuperstep() == 3){
-            kmerMap.clear();
-            createdVertexSet.clear();
-            while(msgIterator.hasNext()){
-                incomingMsg = msgIterator.next();
-                readIdSet = new HashSet<Long>();
-                for(PositionWritable nodeId : incomingMsg.getNodeIdList()){
-                    readIdSet.add(nodeId.getReadId());
-                }
-                kmerMap.put(incomingMsg.getSourceVertexId(), readIdSet);
-            }
+            /** generate KmerMap map kmer(key) to readIdSet(value) **/
+            generateKmerMap(msgIterator);
+            
+            /** set self readId set **/
+            setSelfReadIdSet();
+            
+            int count = 0;
+            //A set storing deleted edges
+            Set<DeletedEdge> deletedEdges = new HashSet<DeletedEdge>();
             /** process connectedTable **/
             for(int i = 0; i < 4; i++){
-                switch(connectedTable[i][0]){
-                    case "FF":
-                        outgoingEdgeList.set(getVertexValue().getFFList());
-                        outgoingEdgeDir = MessageFlag.DIR_FF;
-                        break;
-                    case "FR":
-                        outgoingEdgeList.set(getVertexValue().getFRList());
-                        outgoingEdgeDir = MessageFlag.DIR_FR;
-                        break;
-                }
-                switch(connectedTable[i][1]){
-                    case "RF":
-                        incomingEdgeList.set(getVertexValue().getRFList());
-                        incomingEdgeDir = MessageFlag.DIR_RF;
-                        break;
-                    case "RR":
-                        incomingEdgeList.set(getVertexValue().getRRList());
-                        incomingEdgeDir = MessageFlag.DIR_RR;
-                        break;
-                }
-                selfReadIdSet.clear();
-                for(PositionWritable nodeId : getVertexValue().getNodeIdList()){
-                    selfReadIdSet.add(nodeId.getReadId());
-                }     
-                for(KmerBytesWritable incomingEdge : incomingEdgeList){
-                    for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
-                        outgoingReadIdSet.clear(); 
-                        incomingReadIdSet.clear();
-                        tmpKmer.set(incomingEdge);
-                        incomingReadIdSet.addAll(kmerMap.get(tmpKmer));
-                        tmpKmer.set(outgoingEdge);
-                        outgoingReadIdSet.addAll(kmerMap.get(tmpKmer));
-                        
-                        //set all neighberEdge readId intersection
-                        neighborEdgeIntersection.addAll(selfReadIdSet);
-                        neighborEdgeIntersection.retainAll(incomingReadIdSet);
-                        neighborEdgeIntersection.retainAll(outgoingReadIdSet);
-//                        //set outgoingEdge readId intersection
-//                        outgoingEdgeIntersection.addAll(selfReadIdSet);
-//                        outgoingEdgeIntersection.retainAll(outgoingReadIdSet);
-//                        outgoingEdgeIntersection.removeAll(neighborEdgeIntersection); 
-//                        //set incomingEdge readId intersection
-//                        incomingEdgeIntersection.addAll(selfReadIdSet);
-//                        incomingEdgeIntersection.retainAll(incomingReadIdSet);
-//                        incomingEdgeIntersection.removeAll(neighborEdgeIntersection);
+                /** set edgeList and edgeDir based on connectedTable **/
+                setEdgeListAndEdgeDir(i);
+                
+                KmerBytesWritable incomingEdge = new KmerBytesWritable(kmerSize);
+                KmerBytesWritable outgoingEdge = new KmerBytesWritable(kmerSize);
+                for(int x = 0; x < incomingEdgeList.getCountOfPosition(); x++){
+                    for(int y = 0; y < outgoingEdgeList.getCountOfPosition(); y++){
+                        incomingEdge.set(incomingEdgeList.getPosition(x));
+                        outgoingEdge.set(outgoingEdgeList.getPosition(y));
+                        /** set neighborEdge readId intersection **/
+                        setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
                         
                         if(!neighborEdgeIntersection.isEmpty()){
-                            createdVertex.clear();
-                            createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
-                            createdVertex.setCreatedVertexId(createdVertexId);
-                            createdVertex.setIncomingDir(connectedTable[i][1]);
-                            createdVertex.setOutgoingDir(connectedTable[i][0]);
-                            createdVertex.setIncomingEdge(incomingEdge);
-                            createdVertex.setOutgoingEdge(outgoingEdge);
-                            createdVertexSet.add(createdVertex);
+                            if(count == 0)
+                                createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+                            else
+                                createdVertexId.setByRead("GGG".getBytes(), 0);
+                            count++;
                             
-                            outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
-                            outgoingMsg.setSourceVertexId(getVertexId());
-                            outgoingMsg.setFlag(incomingEdgeDir);
-                            sendMsg(incomingEdge, outgoingMsg);
-                            outgoingMsg.setFlag(outgoingEdgeDir);
-                            sendMsg(outgoingEdge, outgoingMsg);
+                            /** create new/created vertex **/
+                            createNewVertex(i, incomingEdge, outgoingEdge);
+                            
+                            /** send msg to neighbors to update their edges to new vertex **/
+                            sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+                            
+                            /** store deleted edge **/
+                            storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
                         }
-//                        if(!incomingEdgeIntersection.isEmpty()){
-//                            createdVertex.clear();
-//                            createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
-//                            createdVertex.setCreatedVertexId(createdVertexId);
-//                            createdVertex.setIncomingDir(connectedTable[i][1]);
-//                            createdVertex.setIncomingEdge(incomingEdge);
-//                            createdVertexSet.add(createdVertex);
-//                            
-//                            outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
-//                            outgoingMsg.setSourceVertexId(getVertexId());
-//                            outgoingMsg.setFlag(incomingEdgeDir);
-//                            sendMsg(incomingEdge, outgoingMsg);
-//                        }
-//                        
-//                        if(!outgoingEdgeIntersection.isEmpty()){
-//                            createdVertex.clear();
-//                            createdVertexId.setByRead(generaterRandomString(kmerSize + 1).getBytes(), 0);
-//                            createdVertex.setCreatedVertexId(createdVertexId);
-//                            createdVertex.setOutgoingDir(connectedTable[i][0]);
-//                            createdVertex.setOutgoingEdge(outgoingEdge);
-//                            createdVertexSet.add(createdVertex);
-//                            
-//                            outgoingMsg.setCreatedVertexId(createdVertex.getCreatedVertexId());
-//                            outgoingMsg.setSourceVertexId(getVertexId());
-//                            outgoingMsg.setFlag(outgoingEdgeDir);
-//                            sendMsg(outgoingEdge, outgoingMsg);
-//                        }
                     }
                 }
+                
+//                for(KmerBytesWritable incomingEdge : incomingEdgeList){
+//                    for(KmerBytesWritable outgoingEdge : outgoingEdgeList){
+//                        /** set neighborEdge readId intersection **/
+//                        setNeighborEdgeIntersection(incomingEdge, outgoingEdge);
+//                        
+//                        if(!neighborEdgeIntersection.isEmpty()){
+//                            if(count == 0)
+//                                createdVertexId.setByRead("AAA".getBytes(), 0);//kmerSize + 1 generaterRandomString(kmerSize).getBytes()
+//                            else
+//                                createdVertexId.setByRead("GGG".getBytes(), 0);
+//                            count++;
+//                            
+//                            /** create new/created vertex **/
+//                            createNewVertex(i, incomingEdge, outgoingEdge);
+//                            
+//                            /** send msg to neighbors to update their edges to new vertex **/
+//                            sendMsgToUpdateEdge(incomingEdge, outgoingEdge);
+//                            
+//                            /** store deleted edge **/
+//                            storeDeletedEdge(deletedEdges, i, incomingEdge, outgoingEdge);
+//                        }
+//                    }
+//                }
             }
+            /** delete extra edges from old vertex **/
+            for(DeletedEdge deletedEdge : deletedEdges){
+                deleteEdgeFromOldVertex(deletedEdge);
+            }
+            
+            /** Old vertex delete or voteToHalt **/
+            if(getVertexValue().getDegree() == 0)//if no any edge, delete
+                deleteVertex(getVertexId());
+            else
+                voteToHalt();
         } else if(getSuperstep() == 4){
             while(msgIterator.hasNext()){
                 incomingMsg = msgIterator.next();
                 /** update edgelist to new/created vertex **/
-                byte meToNeighborDir = incomingMsg.getFlag();
-                byte neighborToMeDir = mirrorDirection(meToNeighborDir);
-                switch(neighborToMeDir){
-                    case MessageFlag.DIR_FF:
-                        getVertexValue().getFFList().remove(incomingMsg.getSourceVertexId());
-                        getVertexValue().getFFList().append(incomingMsg.getCreatedVertexId());
-                        break;
-                    case MessageFlag.DIR_FR:
-                        getVertexValue().getFRList().remove(incomingMsg.getSourceVertexId());
-                        getVertexValue().getFRList().append(incomingMsg.getCreatedVertexId());
-                        break;
-                    case MessageFlag.DIR_RF:
-                        getVertexValue().getRFList().remove(incomingMsg.getSourceVertexId());
-                        getVertexValue().getRFList().append(incomingMsg.getCreatedVertexId());
-                        break;
-                    case MessageFlag.DIR_RR:
-                        getVertexValue().getRRList().remove(incomingMsg.getSourceVertexId());
-                        getVertexValue().getRRList().append(incomingMsg.getCreatedVertexId());
-                        break;
-                }
-                /** add new/created vertex **/
-                for(CreatedVertex v : createdVertexSet){
-                    Vertex vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
-                    vertex.getMsgList().clear();
-                    vertex.getEdges().clear();
-                    VertexValueWritable vertexValue = new VertexValueWritable();
-                    switch(v.incomingDir){
-                        case "RF":
-                            vertexValue.getRFList().append(v.incomingEdge);
-                            break;
-                        case "RR":
-                            vertexValue.getRRList().append(v.incomingEdge);
-                            break;
-                    }
-                    switch(v.outgoingDir){
-                        case "FF":
-                            vertexValue.getFFList().append(v.outgoingEdge);
-                            break;
-                        case "FR":
-                            vertexValue.getFRList().append(v.outgoingEdge);
-                            break;
-                    }
-                    vertex.setVertexId(v.getCreatedVertexId());
-                    vertex.setVertexValue(vertexValue);
-                }
-                createdVertexSet.clear();
+                updateEdgeListPointToNewVertex();
             }
+            voteToHalt();
         }
     }
+    
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(SplitRepeatVertex.class.getSimpleName());
+        job.setVertexClass(SplitRepeatVertex.class);
+        /**
+         * BinaryInput and BinaryOutput
+         */
+        job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
+        job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
+        job.setDynamicVertexValueSize(true);
+        job.setOutputKeyClass(KmerBytesWritable.class);
+        job.setOutputValueClass(VertexValueWritable.class);
+        Client.run(args, job);
+    }
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
index 21ffe34..1948acd 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/SplitRepeatSmallTestSuite.java
@@ -45,7 +45,7 @@
     //P4ForMergeGraph/bin/read
     public static final String PreFix = "data/SplitRepeat"; //"graphbuildresult";
     public static final String[] TestDir = { PreFix + File.separator
-    + "SimpleTest"};
+    + "AdjSplitRepeat"};
     private static final String ACTUAL_RESULT_DIR = "data/actual/splitrepeat";
     private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
     private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
@@ -59,7 +59,7 @@
     private MiniDFSCluster dfsCluster;
 
     private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
+    private int numberOfNC = 1;
 
     public void setUp() throws Exception {
         ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);