rename a few classes

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2075 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
similarity index 62%
rename from pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
rename to pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index 99f3f56..fd540fd 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -20,6 +20,8 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * interface to implement for combining of messages sent to the same vertex.
  * 
@@ -27,26 +29,40 @@
  * @param <M extends Writable> message data
  */
 @SuppressWarnings("rawtypes")
-public interface VertexCombiner<I extends WritableComparable, M extends Writable> {
+public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
 
     /**
      * initialize combiner
      */
-    public void init();
+    public abstract void init(MsgList providedMsgList);
 
     /**
-     * step call
+     * step call for local combiner
      * 
      * @param vertexIndex
      * @param msg
      * @throws IOException
      */
-    public void step(I vertexIndex, M msg) throws IOException;
+    public abstract void step(I vertexIndex, M msg) throws HyracksDataException;
 
     /**
-     * finish aggregate
+     * step call for global combiner
+     * 
+     * @param vertexIndex
+     * @param msg
+     * @throws IOException
+     */
+    public abstract void step(P partialAggregate) throws HyracksDataException;
+
+    /**
+     * finish partial combiner
+     */
+    public abstract P finishPartial();
+
+    /**
+     * finish final combiner
      * 
      * @return Message
      */
-    public M finish();
+    public abstract MsgList<M> finishFinal();
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
index fcff837..734b1af 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -32,7 +32,7 @@
     private static final long serialVersionUID = 100L;
 
     /**
-     * Default constructor.
+     * Default constructor.s
      */
     public MsgList() {
         super();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 73b434a..e24439f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -21,11 +21,10 @@
 import org.apache.hadoop.mapreduce.Job;
 
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
-import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
 
 /**
  * This class represents a Pregelix job.
@@ -38,7 +37,7 @@
     /** VertexOutputFormat class - optional */
     public static final String VERTEX_OUTPUT_FORMAT_CLASS = "pregelix.vertexOutputFormatClass";
     /** Vertex combiner class - optional */
-    public static final String VERTEX_COMBINER_CLASS = "pregelix.combinerClass";
+    public static final String Message_COMBINER_CLASS = "pregelix.combinerClass";
     /** Global aggregator class - optional */
     public static final String GLOBAL_AGGREGATOR_CLASS = "pregelix.aggregatorClass";
     /** Vertex resolver class - optional */
@@ -51,6 +50,8 @@
     public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
     /** Message value class */
     public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
+    /** Partial combiner value class */
+    public static final String PARTIAL_COMBINE_VALUE_CLASS = "pregelix.partialCombinedValueClass";
     /** Partial aggregate value class */
     public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
     /** Final aggregate value class */
@@ -69,7 +70,6 @@
      */
     public PregelixJob(String jobName) throws IOException {
         super(new Configuration(), jobName);
-        getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, GlobalCountAggregator.class, GlobalAggregator.class);
     }
 
     /**
@@ -122,7 +122,7 @@
      *            Determines how vertex messages are combined
      */
     final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
-        getConfiguration().setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, VertexCombiner.class);
+        getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
     }
 
     /**
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 0e59c8d9..265fd3e 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.pregelix.api.util;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,6 +28,8 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
 /**
  * A Writable for ListArray containing instances of a class.
  */
@@ -41,6 +44,14 @@
     private List<M> pool = new ArrayList<M>();
     /** how many instance in the pool has been used */
     private int used = 0;
+    /** intermediate buffer for copy data element */
+    private final ArrayBackedValueStorage intermediateBuffer = new ArrayBackedValueStorage();
+    /** intermediate data output */
+    private final DataOutput intermediateOutput = intermediateBuffer.getDataOutput();
+    /** input stream */
+    private final ResetableByteArrayInputStream inputStream = new ResetableByteArrayInputStream();
+    /** data input */
+    private final DataInput dataInput = new DataInputStream(inputStream);
 
     /**
      * Using the default constructor requires that the user implement
@@ -51,6 +62,49 @@
     }
 
     /**
+     * clear all the elements
+     */
+    public void clearElements() {
+        this.used = 0;
+        this.clear();
+    }
+
+    /**
+     * Add all elements from another list
+     * 
+     * @param list
+     *            the list of M
+     * @return true if successful, else false
+     */
+    public boolean addAllElements(List<M> list) {
+        for (int i = 0; i < list.size(); i++) {
+            addElement(list.get(i));
+        }
+        return true;
+    }
+
+    /**
+     * Add an element
+     * 
+     * @param element
+     *            M
+     * @return true if successful, else false
+     */
+    public boolean addElement(M element) {
+        try {
+            intermediateBuffer.reset();
+            element.write(intermediateOutput);
+            inputStream.setByteArray(intermediateBuffer.getByteArray(), 0);
+            M value = allocateValue();
+            value.readFields(dataInput);
+            add(value);
+            return true;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
      * This constructor allows setting the refClass during construction.
      * 
      * @param refClass
@@ -109,6 +163,9 @@
 
     public final void setConf(Configuration conf) {
         this.conf = conf;
+        if (this.refClass == null) {
+            setClass();
+        }
     }
 
     private M allocateValue() {
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 9d1084f..d6bf24f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -21,8 +21,9 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -96,17 +97,17 @@
     }
 
     /**
-     * Get the user's subclassed {@link VertexCombiner}.
+     * Get the user's subclassed {@link MessageCombiner}.
      * 
      * @param conf
      *            Configuration to check
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, M extends Writable> Class<? extends VertexCombiner<I, M>> getVertexCombinerClass(
+    public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
             Configuration conf) {
-        return (Class<? extends VertexCombiner<I, M>>) conf.getClass(PregelixJob.VERTEX_COMBINER_CLASS, null,
-                VertexCombiner.class);
+        return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
+                DefaultMessageCombiner.class, MessageCombiner.class);
     }
 
     /**
@@ -120,7 +121,7 @@
     public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
             Configuration conf) {
         return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
-                GlobalAggregator.class);
+                GlobalCountAggregator.class, GlobalAggregator.class);
     }
 
     /**
@@ -131,9 +132,9 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, M extends Writable> VertexCombiner<I, M> createVertexCombiner(
+    public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
             Configuration conf) {
-        Class<? extends VertexCombiner<I, M>> vertexCombinerClass = getVertexCombinerClass(conf);
+        Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
         return ReflectionUtils.newInstance(vertexCombinerClass, conf);
     }
 
@@ -301,6 +302,20 @@
     }
 
     /**
+     * Get the user's subclassed combiner's partial combine value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's global aggregate value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
      * Get the user's subclassed global aggregator's global value class.
      * 
      * @param conf
@@ -333,7 +348,7 @@
     }
 
     /**
-     * Create a user aggregate value
+     * Create a user partial aggregate value
      * 
      * @param conf
      *            Configuration to check
@@ -351,6 +366,30 @@
     }
 
     /**
+     * Create a user partial combine value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    @SuppressWarnings("rawtypes")
+    public static <M extends Writable> M createPartialCombineValue(Configuration conf) {
+        Class<M> aggregateValueClass = getPartialCombineValueClass(conf);
+        try {
+            M instance = aggregateValueClass.newInstance();
+            if (instance instanceof MsgList) {
+                // set conf for msg list, if the value type is msglist
+                ((MsgList) instance).setConf(conf);
+            }
+            return instance;
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+
+    /**
      * Create a user aggregate value
      * 
      * @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
new file mode 100755
index 0000000..5702642
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends InputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayInputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public int read() {
+        int remaining = data.length - position;
+        int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+        }
+        return value;
+    }
+
+    @Override
+    public int read(byte[] bytes, int offset, int length) {
+        int remaining = data.length - position;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+                    + length + " position: " + position);
+        }
+        if (remaining == 0) {
+            return -1;
+        }
+        int l = Math.min(length, remaining);
+        System.arraycopy(data, position, bytes, offset, l);
+        position += l;
+        return l;
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
new file mode 100755
index 0000000..307a3ae
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends OutputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayOutputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public void write(int b) {
+        int remaining = data.length - position;
+        if (position + 1 > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        data[position] = (byte) b;
+        position++;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+                    + position);
+        }
+        if (position + length > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        System.arraycopy(bytes, offset, data, position, length);
+        position += length;
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 2dddccc..e94331d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -60,6 +60,7 @@
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,6 +130,13 @@
             Type finalAggregateValueType = argTypes.get(5);
             conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
         }
+
+        Class combinerClass = conf.getClass(PregelixJob.Message_COMBINER_CLASS, MessageCombiner.class);
+        if (!combinerClass.equals(MessageCombiner.class)) {
+            List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+            Type partialCombineValueType = argTypes.get(2);
+            conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+        }
     }
 
     public String getJobId() {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index c166903..c0e87d3 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -23,9 +23,11 @@
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
 import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
@@ -41,27 +43,44 @@
     /**
      * Test whether combiner is called to get the minimum ID in the cluster
      */
-    public static class SimpleMinCombiner implements VertexCombiner<VLongWritable, VLongWritable> {
+    public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, VLongWritable, VLongWritable> {
         private long min = Long.MAX_VALUE;
         private VLongWritable agg = new VLongWritable();
+        private MsgList<VLongWritable> msgList;
 
         @Override
-        public void step(VLongWritable vertexIndex, VLongWritable msg) throws IOException {
+        public void step(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
             long value = msg.get();
             if (min > value)
                 min = value;
         }
 
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         @Override
-        public void init() {
+        public void init(MsgList msgList) {
             min = Long.MAX_VALUE;
+            this.msgList = msgList;
         }
 
         @Override
-        public VLongWritable finish() {
+        public void step(VLongWritable partialAggregate) throws HyracksDataException {
+            if (min > partialAggregate.get())
+                min = partialAggregate.get();
+        }
+
+        @Override
+        public VLongWritable finishPartial() {
             agg.set(min);
             return agg;
         }
+
+        @Override
+        public MsgList<VLongWritable> finishFinal() {
+            agg.set(min);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
     }
 
     private VLongWritable outputValue = new VLongWritable();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 2dd639d..0a6402c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -30,8 +30,10 @@
 
 import com.google.common.collect.Maps;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexReader;
 import edu.uci.ics.pregelix.api.io.VertexWriter;
 import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexInputFormat;
@@ -56,24 +58,40 @@
     /**
      * Test whether combiner is called by summing up the messages.
      */
-    public static class SimpleSumCombiner implements VertexCombiner<VLongWritable, DoubleWritable> {
+    public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
         private double sum = 0.0;
         private DoubleWritable agg = new DoubleWritable();
+        private MsgList<DoubleWritable> msgList;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void init(MsgList msgList) {
+            sum = 0.0;
+            this.msgList = msgList;
+        }
 
         @Override
-        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws IOException {
+        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
             sum += msg.get();
         }
 
         @Override
-        public void init() {
-            sum = 0.0;
+        public DoubleWritable finishPartial() {
+            agg.set(sum);
+            return agg;
         }
 
         @Override
-        public DoubleWritable finish() {
+        public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+            sum += partialAggregate.get();
+        }
+
+        @Override
+        public MsgList<DoubleWritable> finishFinal() {
             agg.set(sum);
-            return agg;
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
         }
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index a5eee58..b039c08 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.pregelix.example;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -23,9 +22,11 @@
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.client.Client;
@@ -39,27 +40,45 @@
     /**
      * Test whether combiner is called by summing up the messages.
      */
-    public static class SimpleMinCombiner implements VertexCombiner<VLongWritable, DoubleWritable> {
+    public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
         private double min = Double.MAX_VALUE;
         private DoubleWritable agg = new DoubleWritable();
+        private MsgList<DoubleWritable> msgList;
 
         @Override
-        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws IOException {
+        public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
             double value = msg.get();
             if (min > value)
                 min = value;
         }
 
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
-        public void init() {
+        public void init(MsgList msgList) {
             min = Double.MAX_VALUE;
+            this.msgList = msgList;
         }
 
         @Override
-        public DoubleWritable finish() {
+        public DoubleWritable finishPartial() {
             agg.set(min);
             return agg;
         }
+
+        @Override
+        public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+            double value = partialAggregate.get();
+            if (min > value)
+                min = value;
+        }
+
+        @Override
+        public MsgList<DoubleWritable> finishFinal() {
+            agg.set(min);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
     }
 
     private DoubleWritable outputValue = new DoubleWritable();
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index 22ae6cf..e953261 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 50662f9..ffff302 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index e425b38..5eba09a 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index b51bd98..76905cd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index a9e43bd..3f8a04f 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -124,6 +124,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index 3719247..f9c4bcd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -125,6 +125,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 90caf6b..c415cba 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -125,6 +125,7 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index e4697ab..071bc17 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -30,8 +30,8 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
@@ -41,7 +41,7 @@
     private final Configuration conf;
     private final boolean isFinalStage;
     private final DataOutput output;
-    private VertexCombiner combiner;
+    private MessageCombiner combiner;
     private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
     private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
     private DataInput keyInput = new DataInputStream(keyInputStream);
@@ -59,17 +59,15 @@
         this.isFinalStage = isFinalStage;
         msgList.setConf(this.conf);
 
-        combiner = BspUtils.createVertexCombiner(conf);
+        combiner = BspUtils.createMessageCombiner(conf);
         key = BspUtils.createVertexIndex(conf);
-        value = BspUtils.createMessageValue(conf);
-        combinedResult = BspUtils.createMessageValue(conf);
+        value = !isFinalStage ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
     }
 
     @Override
     public void init() throws HyracksDataException {
-        msgList.clear();
         keyRead = false;
-        combiner.init();
+        combiner.init(msgList);
     }
 
     @Override
@@ -89,10 +87,14 @@
         try {
             if (!keyRead) {
                 key.readFields(keyInput);
-                keyRead = false;
+                keyRead = true;
             }
             value.readFields(valueInput);
-            combiner.step(key, value);
+            if (!isFinalStage) {
+                combiner.step(key, value);
+            } else {
+                combiner.step(value);
+            }
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
@@ -102,13 +104,12 @@
     @Override
     public void finish() throws HyracksDataException {
         try {
-            combinedResult = combiner.finish();
             if (!isFinalStage) {
-                combinedResult.write(output);
+                combinedResult = combiner.finishPartial();
             } else {
-                msgList.add(combinedResult);
-                msgList.write(output);
+                combinedResult = combiner.finishFinal();
             }
+            combinedResult.write(output);
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }