Add dirMapper to mapKey in P1
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
index e873856..55eed63 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/MapReduceVertex.java
@@ -73,8 +73,8 @@
         }
     }
     
-    public ArrayList<Byte> mapKeyByInternalKmer(Iterator<M> msgIterator){
-        ArrayList<Byte> kmerDir = new ArrayList<Byte>();
+    public void mapKeyByInternalKmer(Iterator<M> msgIterator){
+//        ArrayList<Byte> kmerDir = new ArrayList<Byte>();
         while(msgIterator.hasNext()){
             incomingMsg = msgIterator.next();
             String kmerString = incomingMsg.getInternalKmer().toString();
@@ -84,11 +84,11 @@
             VKmerBytesWritable kmer = new VKmerBytesWritable();
             kmerList = new VKmerListWritable();
             if(reverseKmer.compareTo(tmpKmer) > 0){
-                kmerDir.add(KmerDir.FORWARD);
+//                kmerDir.add(KmerDir.FORWARD);
                 kmer.setAsCopy(tmpKmer);
             }
             else{
-                kmerDir.add(KmerDir.REVERSE);
+//                kmerDir.add(KmerDir.REVERSE);
                 kmer.setAsCopy(reverseKmer);
             }
             if(!kmerMapper.containsKey(kmer)){
@@ -101,7 +101,6 @@
                 kmerMapper.put(kmer, kmerList);
             }
         }
-        return kmerDir;
     }
     
     public void reduceKeyByInternalKmer(){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
index 29465a4..306f124 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P1ForPathMergeVertex.java
@@ -1,6 +1,7 @@
 package edu.uci.ics.genomix.pregelix.operator.pathmerge;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 
 import edu.uci.ics.genomix.type.EdgeWritable;
@@ -11,6 +12,7 @@
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
 import edu.uci.ics.genomix.pregelix.io.message.PathMergeMessageWritable;
 import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
+import edu.uci.ics.genomix.pregelix.operator.pathmerge.MapReduceVertex.KmerDir;
 import edu.uci.ics.genomix.pregelix.type.MessageFlag;
 
 /**
@@ -22,6 +24,8 @@
     MapReduceVertex<VertexValueWritable, PathMergeMessageWritable> {
     
     private ArrayList<PathMergeMessageWritable> receivedMsg = new ArrayList<PathMergeMessageWritable>();
+    private HashMap<VKmerBytesWritable, ArrayList<Byte>> dirMapper = new HashMap<VKmerBytesWritable, ArrayList<Byte>>();
+    private ArrayList<Byte> dirList = new ArrayList<Byte>();
     
     private EdgeWritable tmpEdge = new EdgeWritable();
     /**
@@ -73,17 +77,66 @@
      */
     public void aggregateMsgAndGroupInFakeNode(Iterator<PathMergeMessageWritable> msgIterator){
         kmerMapper.clear();
+        dirMapper.clear();
         /** Mapper **/
-        ArrayList<Byte> kmerDir = mapKeyByInternalKmer(msgIterator);
-        boolean isFlip = kmerDir.get(0) == kmerDir.get(1) ? false : true;
+        mapKeyByInternalKmer(msgIterator);
+//        boolean isFlip = kmerDir.get(0) == kmerDir.get(1) ? false : true;
         /** Reducer **/
-        reduceKeyByInternalKmer(isFlip);
+        reduceKeyByInternalKmer();
+    }
+    
+    /**
+     * typical for P1
+     * @return 
+     */
+    @Override
+    public void mapKeyByInternalKmer(Iterator<PathMergeMessageWritable> msgIterator){
+        byte dir = 0;
+        while(msgIterator.hasNext()){
+            incomingMsg = msgIterator.next();
+            String kmerString = incomingMsg.getInternalKmer().toString();
+            tmpKmer.setByRead(kmerString.length(), kmerString.getBytes(), 0);
+            reverseKmer.setByReadReverse(kmerString.length(), kmerString.getBytes(), 0);
+
+            VKmerBytesWritable kmer = new VKmerBytesWritable();
+            kmerList = new VKmerListWritable();
+            if(reverseKmer.compareTo(tmpKmer) > 0){
+                dir = KmerDir.FORWARD;
+                kmer.setAsCopy(tmpKmer);
+            }
+            else{
+                dir = KmerDir.REVERSE;
+                kmer.setAsCopy(reverseKmer);
+            }
+            if(!kmerMapper.containsKey(kmer)){
+                //kmerList.reset();
+                kmerList.append(incomingMsg.getSourceVertexId());
+                kmerMapper.put(kmer, kmerList);
+            } else{
+                kmerList.setCopy(kmerMapper.get(kmer));
+                kmerList.append(incomingMsg.getSourceVertexId());
+                kmerMapper.put(kmer, kmerList);
+            }
+            //dirMapper
+            if(!dirMapper.containsKey(kmer)){
+                dirList.clear();
+                dirList.add(dir);
+                dirMapper.put(kmer, dirList);
+            } else{
+                dirList.clear();
+                dirList.addAll(dirMapper.get(kmer));
+                dirList.add(dir);
+                dirMapper.put(kmer, dirList);
+            }
+        }
     }
     
     /**
      * typical for P1
      */
-    public void reduceKeyByInternalKmer(boolean isFlip){
+    @Override
+    public void reduceKeyByInternalKmer(){
+        int i = 0;
         for (VKmerBytesWritable key : kmerMapper.keySet()) {
             kmerList = kmerMapper.get(key);
             //always delete kmerList(1), keep kmerList(0)
@@ -91,9 +144,11 @@
             //send kill message to kmerList(1), and carry with kmerList(0) to update edgeLists of kmerList(1)'s neighbor
             outgoingMsg.setFlag(MessageFlag.KILL);
             outgoingMsg.getNode().setInternalKmer(kmerList.getPosition(0));
+            boolean isFlip = dirMapper.get(key).get(0) == dirMapper.get(key).get(1) ? false : true;
             outgoingMsg.setFlip(isFlip);
             destVertexId.setAsCopy(kmerList.getPosition(1));
             sendMsg(destVertexId, outgoingMsg);
+            i = i + 2;
         }
     }
  
@@ -194,8 +249,9 @@
                     voteToHalt();
                 } else if(receivedMsg.size() == 1){
                     boolean isHead = isHeadNode();
+                    boolean isDead = isDeadNode();
                     processMerge(receivedMsg.get(0));
-                    if(isHead){
+                    if(isHead || isDead){
                         // NON-FAKE and Final vertice send msg to FAKE vertex 
                         sendMsgToFakeVertex();
                         //final vertex