rename a few classes

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2075 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index c166903..c0e87d3 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -23,9 +23,11 @@
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
 import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
@@ -41,27 +43,44 @@
     /**
      * Test whether combiner is called to get the minimum ID in the cluster
      */
-    public static class SimpleMinCombiner implements VertexCombiner<VLongWritable, VLongWritable> {
+    public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, VLongWritable, VLongWritable> {
         private long min = Long.MAX_VALUE;
         private VLongWritable agg = new VLongWritable();
+        private MsgList<VLongWritable> msgList;
 
         @Override
-        public void step(VLongWritable vertexIndex, VLongWritable msg) throws IOException {
+        public void step(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
             long value = msg.get();
             if (min > value)
                 min = value;
         }
 
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         @Override
-        public void init() {
+        public void init(MsgList msgList) {
             min = Long.MAX_VALUE;
+            this.msgList = msgList;
         }
 
         @Override
-        public VLongWritable finish() {
+        public void step(VLongWritable partialAggregate) throws HyracksDataException {
+            if (min > partialAggregate.get())
+                min = partialAggregate.get();
+        }
+
+        @Override
+        public VLongWritable finishPartial() {
             agg.set(min);
             return agg;
         }
+
+        @Override
+        public MsgList<VLongWritable> finishFinal() {
+            agg.set(min);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
     }
 
     private VLongWritable outputValue = new VLongWritable();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 2dd639d..0a6402c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -30,8 +30,10 @@
 
 import com.google.common.collect.Maps;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexReader;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
 import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexInputFormat;
@@ -56,24 +58,40 @@
     /**
      * Test whether combiner is called by summing up the messages.
      */
-    public static class SimpleSumCombiner implements VertexCombiner<VLongWritable, DoubleWritable> {
+    public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
         private double sum = 0.0;
         private DoubleWritable agg = new DoubleWritable();
+        private MsgList<DoubleWritable> msgList;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void init(MsgList msgList) {
+            sum = 0.0;
+            this.msgList = msgList;
+        }
 
         @Override
-        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws IOException {
+        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
             sum += msg.get();
         }
 
         @Override
-        public void init() {
-            sum = 0.0;
+        public DoubleWritable finishPartial() {
+            agg.set(sum);
+            return agg;
         }
 
         @Override
-        public DoubleWritable finish() {
+        public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+            sum += partialAggregate.get();
+        }
+
+        @Override
+        public MsgList<DoubleWritable> finishFinal() {
             agg.set(sum);
-            return agg;
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
         }
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index a5eee58..b039c08 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.pregelix.example;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -23,9 +22,11 @@
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.client.Client;
@@ -39,27 +40,45 @@
     /**
      * Test whether combiner is called by summing up the messages.
      */
-    public static class SimpleMinCombiner implements VertexCombiner<VLongWritable, DoubleWritable> {
+    public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
         private double min = Double.MAX_VALUE;
         private DoubleWritable agg = new DoubleWritable();
+        private MsgList<DoubleWritable> msgList;
 
         @Override
-        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws IOException {
+        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
             double value = msg.get();
             if (min > value)
                 min = value;
         }
 
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
-        public void init() {
+        public void init(MsgList msgList) {
             min = Double.MAX_VALUE;
+            this.msgList = msgList;
         }
 
         @Override
-        public DoubleWritable finish() {
+        public DoubleWritable finishPartial() {
             agg.set(min);
             return agg;
         }
+
+        @Override
+        public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+            double value = partialAggregate.get();
+            if (min > value)
+                min = value;
+        }
+
+        @Override
+        public MsgList<DoubleWritable> finishFinal() {
+            agg.set(min);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
     }
 
     private DoubleWritable outputValue = new DoubleWritable();
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 22ae6cf..e953261 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 50662f9..ffff302 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index e425b38..5eba09a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index b51bd98..76905cd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index a9e43bd..3f8a04f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 3719247..f9c4bcd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -125,6 +125,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 90caf6b..c415cba 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -125,6 +125,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>