PathMerge aggregator is completed
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index 7b82dc1..29183c5 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -58,6 +58,7 @@
     protected byte incomingEdgeDir = 0; //SplitRepeat
     protected byte outgoingEdgeDir = 0; //SplitRepeat
     
+    protected HashMapWritable<ByteWritable, VLongWritable> counters = new HashMapWritable<ByteWritable, VLongWritable>();
     /**
      * initiate kmerSize, maxIteration
      */
@@ -698,6 +699,23 @@
         outgoingEdgeDir = connectedTable[i][1];
     }
     
+    
+    /**
+     * set statistics counter
+     */
+    public void updateStatisticsCounter(byte counterName){
+        ByteWritable counterNameWritable = new ByteWritable(counterName);
+        if(counters.containsKey(counterNameWritable))
+            counters.get(counterNameWritable).set(counters.get(counterNameWritable).get() + 1);
+        else
+            counters.put(counterNameWritable, new VLongWritable(1));
+    }
+    
+    /**
+     * read statistics counters
+     * @param conf
+     * @return
+     */
     public static HashMapWritable<ByteWritable, VLongWritable> readStatisticsCounterResult(Configuration conf) {
         try {
             VertexValueWritable value = (VertexValueWritable) IterationUtils
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 57eb2f2..174fb84 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -4,19 +4,12 @@
 import java.util.Iterator;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
-
 import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 import edu.uci.ics.genomix.config.GenomixJobConf;
 import edu.uci.ics.genomix.pregelix.client.Client;
 import edu.uci.ics.genomix.pregelix.format.GraphCleanInputFormat;
 import edu.uci.ics.genomix.pregelix.format.GraphCleanOutputFormat;
-import edu.uci.ics.genomix.pregelix.io.ByteWritable;
-import edu.uci.ics.genomix.pregelix.io.HashMapWritable;
 import edu.uci.ics.genomix.pregelix.io.PathMergeMessageWritable;
-import edu.uci.ics.genomix.pregelix.io.VLongWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
 import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
@@ -73,7 +66,6 @@
     private boolean nextHead;
     private boolean prevHead;
     
-    HashMapWritable<ByteWritable, VLongWritable> counters = new HashMapWritable<ByteWritable, VLongWritable>();
     /**
      * initiate kmerSize, maxIteration
      */
@@ -109,6 +101,8 @@
         tmpValue.reset();
         if(getSuperstep() > 1)
             StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+        counters.clear();
+        getVertexValue().getCounters().clear();
     }
 
     protected boolean isNodeRandomHead(VKmerBytesWritable nodeKmer) {
@@ -153,139 +147,108 @@
         return false;
     }
     
-    /**
-     * set statistics counter
-     */
-    public void updateStatisticsCounter(byte counterName){
-        ByteWritable counterNameWritable = new ByteWritable(counterName);
-        if(counters.containsKey(counterNameWritable))
-            counters.get(counterNameWritable).set(counters.get(counterNameWritable).get() + 1);
-        else
-            counters.put(counterNameWritable, new VLongWritable(1));
-    }
-    
     @Override
     public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
         initVertex();
-        counters.clear();
-        getVertexValue().getCounters().clear();
-        if(getSuperstep() == 1){
-//            if(getVertexId().toString().contains("AT") || getVertexId().toString().contains("GA")){
-                updateStatisticsCounter(StatisticsCounter.MergedNodes);
-//                updateStatisticsCounter(StatisticsCounter.MergedPaths);
-                getVertexValue().setCounters(counters);
-                activate();
-//            }
-        } else if(getSuperstep() == 2){
-            if(getVertexId().toString().contains("AA")){
-                updateStatisticsCounter(StatisticsCounter.MergedNodes);
-            } else{
-                updateStatisticsCounter(StatisticsCounter.MergedPaths);
+        if (getSuperstep() == 1)
+            startSendMsg();
+        else if (getSuperstep() == 2)
+            initState(msgIterator);
+        else if (getSuperstep() % 4 == 3){
+            outFlag |= headFlag;
+            
+            outFlag |= MessageFlag.NO_MERGE;
+            setStateAsNoMerge();
+            
+            // only PATH vertices are present. Find the ID's for my neighbors
+            curKmer.setAsCopy(getVertexId());
+            
+            curHead = isNodeRandomHead(curKmer);
+            
+            // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
+            // We prevent merging towards non-path nodes
+            hasNext = setNextInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHNEXT));
+            hasPrev = setPrevInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHPREV));
+            if (hasNext || hasPrev) {
+                if (curHead) {
+                    if (hasNext && !nextHead) {
+                        // compress this head to the forward tail
+                		sendUpdateMsgToPredecessor(); 
+                    } else if (hasPrev && !prevHead) {
+                        // compress this head to the reverse tail
+                        sendUpdateMsgToSuccessor();
+                    } 
+                }
+                else {
+                    // I'm a tail
+                    if (hasNext && hasPrev) {
+                         if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
+                            // tails on both sides, and I'm the "local minimum"
+                            // compress me towards the tail in forward dir
+                            sendUpdateMsgToPredecessor();
+                        }
+                    } else if (!hasPrev) {
+                        // no previous node
+                        if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
+                            // merge towards tail in forward dir
+                            sendUpdateMsgToPredecessor();
+                        }
+                    } else if (!hasNext) {
+                        // no next node
+                        if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
+                            // merge towards tail in reverse dir
+                            sendUpdateMsgToSuccessor();
+                        }
+                    }
+                }
             }
-            getVertexValue().setCounters(counters);
-            voteToHalt();
-        } 
-//        else{
-
-//            updateStatisticsCounter(StatisticsCounter.MergedPaths);
-//            getVertexValue().setCounters(counters);
-//            voteToHalt();
-//        }
-//        if (getSuperstep() == 1)
-//            startSendMsg();
-//        else if (getSuperstep() == 2)
-//            initState(msgIterator);
-//        else if (getSuperstep() % 4 == 3){
-//            outFlag |= headFlag;
-//            
-//            outFlag |= MessageFlag.NO_MERGE;
-//            setStateAsNoMerge();
-//            
-//            // only PATH vertices are present. Find the ID's for my neighbors
-//            curKmer.setAsCopy(getVertexId());
-//            
-//            curHead = isNodeRandomHead(curKmer);
-//            
-//            // the headFlag and tailFlag's indicate if the node is at the beginning or end of a simple path. 
-//            // We prevent merging towards non-path nodes
-//            hasNext = setNextInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHNEXT));
-//            hasPrev = setPrevInfo(getVertexValue()) && (headFlag == 0 || (headFlag > 0 && headMergeDir == MessageFlag.HEAD_SHOULD_MERGEWITHPREV));
-//            if (hasNext || hasPrev) {
-//                if (curHead) {
-//                    if (hasNext && !nextHead) {
-//                        // compress this head to the forward tail
-//                		sendUpdateMsgToPredecessor(); 
-//                    } else if (hasPrev && !prevHead) {
-//                        // compress this head to the reverse tail
-//                        sendUpdateMsgToSuccessor();
-//                    } 
-//                }
-//                else {
-//                    // I'm a tail
-//                    if (hasNext && hasPrev) {
-//                         if ((!nextHead && !prevHead) && (curKmer.compareTo(nextKmer) < 0 && curKmer.compareTo(prevKmer) < 0)) {
-//                            // tails on both sides, and I'm the "local minimum"
-//                            // compress me towards the tail in forward dir
-//                            sendUpdateMsgToPredecessor();
-//                        }
-//                    } else if (!hasPrev) {
-//                        // no previous node
-//                        if (!nextHead && curKmer.compareTo(nextKmer) < 0) {
-//                            // merge towards tail in forward dir
-//                            sendUpdateMsgToPredecessor();
-//                        }
-//                    } else if (!hasNext) {
-//                        // no next node
-//                        if (!prevHead && curKmer.compareTo(prevKmer) < 0) {
-//                            // merge towards tail in reverse dir
-//                            sendUpdateMsgToSuccessor();
-//                        }
-//                    }
-//                }
-//            }
-//            this.activate();
-//        }
-//        else if (getSuperstep() % 4 == 0){
-//            //update neighber
-//            while (msgIterator.hasNext()) {
-//                incomingMsg = msgIterator.next();
-//                processUpdate();
-//                if(isHaltNode())
-//                    voteToHalt();
-//                else
-//                    this.activate();
-//            }
-//        } else if (getSuperstep() % 4 == 1){
-//            //send message to the merge object and kill self
-//            broadcastMergeMsg();
-//        } else if (getSuperstep() % 4 == 2){
-//            //merge tmpKmer
-//            while (msgIterator.hasNext()) {
-//                incomingMsg = msgIterator.next();
-//                selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
-//                /** process merge **/
-//                processMerge();
-//                // set statistics counter: MergedNodes
-//                updateStatisticsCounter(StatisticsCounter.MergedNodes);
-//                /** if it's a tandem repeat, which means detecting cycle **/
-//                if(isTandemRepeat()){
-//                    for(byte d : DirectionFlag.values)
-//                        getVertexValue().getEdgeList(d).reset();
-//                    getVertexValue().setState(MessageFlag.IS_HALT);
-//                    // set statistics counter: TandemRepeats
-//                    updateStatisticsCounter(StatisticsCounter.TandemRepeats);
-//                    voteToHalt();
-//                }/** head meets head, stop **/ 
-//                else if((getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)){
-//                    getVertexValue().setState(MessageFlag.IS_HALT);
-//                    // set statistics counter: MergedPaths
-//                    updateStatisticsCounter(StatisticsCounter.MergedPaths);
-//                    voteToHalt();
-//                }
-//                else
-//                    this.activate();
-//            }
-//        }
+            this.activate();
+        }
+        else if (getSuperstep() % 4 == 0){
+            //update neighber
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                processUpdate();
+                if(isHaltNode())
+                    voteToHalt();
+                else
+                    this.activate();
+            }
+        } else if (getSuperstep() % 4 == 1){
+            //send message to the merge object and kill self
+            broadcastMergeMsg();
+        } else if (getSuperstep() % 4 == 2){
+            //merge tmpKmer
+            while (msgIterator.hasNext()) {
+                incomingMsg = msgIterator.next();
+                selfFlag = (byte) (State.VERTEX_MASK & getVertexValue().getState());
+                /** process merge **/
+                processMerge();
+                // set statistics counter: Num_MergedNodes
+                updateStatisticsCounter(StatisticsCounter.Num_MergedNodes);
+                /** if it's a tandem repeat, which means detecting cycle **/
+                if(isTandemRepeat()){
+                    for(byte d : DirectionFlag.values)
+                        getVertexValue().getEdgeList(d).reset();
+                    getVertexValue().setState(MessageFlag.IS_HALT);
+                    // set statistics counter: Num_TandemRepeats
+                    updateStatisticsCounter(StatisticsCounter.Num_TandemRepeats);
+                    getVertexValue().setCounters(counters);
+                    voteToHalt();
+                }/** head meets head, stop **/ 
+                else if((getMsgFlag() == MessageFlag.IS_HEAD && selfFlag == MessageFlag.IS_HEAD)){
+                    getVertexValue().setState(MessageFlag.IS_HALT);
+                    // set statistics counter: Num_MergedPaths
+                    updateStatisticsCounter(StatisticsCounter.Num_MergedPaths);
+                    getVertexValue().setCounters(counters);
+                    voteToHalt();
+                }
+                else{
+                    getVertexValue().setCounters(counters);
+                    this.activate();
+                }
+            }
+        }
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
index 161a41e..8c5ebb0 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/tipremove/TipRemoveVertex.java
@@ -11,6 +11,8 @@
 import edu.uci.ics.genomix.pregelix.io.MessageWritable;
 import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
 import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
+import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
+import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
 import edu.uci.ics.genomix.pregelix.util.VertexUtil;
 import edu.uci.ics.genomix.type.VKmerBytesWritable;
 
@@ -65,6 +67,10 @@
             outgoingMsg.reset();
         if(destVertexId == null)
             destVertexId = new VKmerBytesWritable();
+        if(getSuperstep() > 1)
+            StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
+        counters.clear();
+        getVertexValue().getCounters().clear();
     }
 
     @Override
@@ -75,18 +81,27 @@
             	if(getVertexValue().getKmerLength() <= length){
             	    sendSettledMsgToNextNode();
             		deleteVertex(getVertexId());
+                    //set statistics counter: Num_RemovedTips
+                    updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+                    getVertexValue().setCounters(counters);
             	}
             }
             else if(VertexUtil.isOutgoingTipVertex(getVertexValue())){
                 if(getVertexValue().getKmerLength() <= length){
-
                     sendSettledMsgToPrevNode();
                     deleteVertex(getVertexId());
+                    //set statistics counter: Num_RemovedTips
+                    updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+                    getVertexValue().setCounters(counters);
                 }
             }
             else if(VertexUtil.isSingleVertex(getVertexValue())){
-                if(getVertexValue().getKmerLength() <= length)
+                if(getVertexValue().getKmerLength() <= length){
                     deleteVertex(getVertexId());
+                    //set statistics counter: Num_RemovedTips
+                    updateStatisticsCounter(StatisticsCounter.Num_RemovedTips);
+                    getVertexValue().setCounters(counters);
+                }
             }
         }
         else if(getSuperstep() == 2){
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
index fa7ee42..962699c 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/type/StatisticsCounter.java
@@ -1,7 +1,29 @@
 package edu.uci.ics.genomix.pregelix.type;
 
 public class StatisticsCounter {
-    public static final byte MergedNodes = 0b00 << 0;
-    public static final byte MergedPaths = 0b01 << 0;
-    public static final byte TandemRepeats = 0b10 << 0;
+    public static final byte Num_MergedNodes = 0b00 << 0;
+    public static final byte Num_MergedPaths = 0b01 << 0;
+    public static final byte Num_TandemRepeats = 0b10 << 0;
+    public static final byte Num_RemovedTips = 0b11 << 0;
+    
+    public final static class COUNTER_CONTENT{
+        public static String getContent(byte code){
+            String r = "";
+            switch(code){
+                case Num_MergedNodes:
+                    r = "num of merged nodes";
+                    break;
+                case Num_MergedPaths:
+                    r = "num of merge paths";
+                    break;
+                case Num_TandemRepeats:
+                    r = "num of tandem repeats";
+                    break;
+                case Num_RemovedTips:
+                    r = "num of removed tips";
+                    break;
+            }
+            return r;
+        }
+    }
 }
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
index 315f3aa..f2c0501 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobGen/JobGenerator.java
@@ -148,6 +148,7 @@
     private static void generateTipAddGraphJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(new GenomixJobConf(3), jobName);
         job.setVertexClass(TipAddVertex.class);
+        job.setGlobalAggregatorClass(StatisticsAggregator.class);
         job.setVertexInputFormatClass(InitialGraphCleanInputFormat.class);
         job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
         job.setDynamicVertexValueSize(true);
@@ -164,6 +165,7 @@
     private static void generateTipRemoveGraphJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(new GenomixJobConf(3), jobName);
         job.setVertexClass(TipRemoveVertex.class);
+        job.setGlobalAggregatorClass(StatisticsAggregator.class);
         job.setVertexInputFormatClass(GraphCleanInputFormat.class);
         job.setVertexOutputFormatClass(GraphCleanOutputFormat.class);
         job.setDynamicVertexValueSize(true);
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
index 4d53925..11c5fb5 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.genomix.pregelix.io.VLongWritable;
 import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
 import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
+import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
@@ -66,6 +67,16 @@
         }
     }
 
+    public void outputCounters(HashMapWritable<ByteWritable, VLongWritable> counters){
+        String output = "";
+        for(ByteWritable counterName : counters.keySet()){
+            output += StatisticsCounter.COUNTER_CONTENT.getContent(counterName.get());
+            output += ": ";
+            output += counters.get(counterName).toString() + "\n";
+        }
+        System.out.println(output);
+    }
+    
     @Test
     public void test() throws Exception {
         setUp();
@@ -73,8 +84,10 @@
         for (Plan plan : plans) {
             driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
                     PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
-            HashMapWritable<ByteWritable, VLongWritable> counters = BasicGraphCleanVertex.readStatisticsCounterResult(job.getConfiguration());
-            System.out.println(counters.toString());
+//            HashMapWritable<ByteWritable, VLongWritable> counters = BasicGraphCleanVertex.readStatisticsCounterResult(job.getConfiguration());
+//            //output counters
+//            outputCounters(counters);
+//            System.out.println("");
         }
         compareResults();
         tearDown();
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java
index 81dc9ba..33e0650 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/TipRemoveTestSuite.java
@@ -6,7 +6,7 @@
 
     public static Test suite() throws Exception {
         String pattern ="TipRemove";
-        String testSet[] = {"SmallGenome", "FR_Tip", "RF_Tip"};
+        String testSet[] = {"FR_Tip"};//{"SmallGenome", "FR_Tip", "RF_Tip"};
         init(pattern, testSet);
         BasicGraphCleanTestSuite testSuite = new BasicGraphCleanTestSuite();
         return makeTestSuite(testSuite);