add global aggregator support

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2066 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 9920b4c..329e48a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -5,26 +5,43 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 @SuppressWarnings("rawtypes")
-public interface GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> {
-	/**
-	 * initialize combiner
-	 */
-	public void init();
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+    /**
+     * initialize combiner
+     */
+    public abstract void init();
 
-	/**
-	 * step call
-	 * 
-	 * @param vertexIndex
-	 * @param msg
-	 * @throws IOException
-	 */
-	public void step(Vertex<I, V, E, M> v) throws IOException;
+    /**
+     * step through all vertex at each slave partition
+     * 
+     * @param vertexIndex
+     * @param msg
+     * @throws IOException
+     */
+    public abstract void step(Vertex<I, V, E, M> v) throws HyracksDataException;
 
-	/**
-	 * finish aggregate
-	 * 
-	 * @return the final aggregate value
-	 */
-	public T finish();
+    /**
+     * step through all intermediate aggregate result
+     * 
+     * @param partialResult partial aggregate value
+     */
+    public abstract void step(P partialResult);
+
+    
+    /**
+     * finish partial aggregate
+     * 
+     * @return the final aggregate value
+     */
+    public abstract P finishPartial();
+    
+    /**
+     * finish final aggregate
+     * 
+     * @return the final aggregate value
+     */
+    public abstract F finishFinal();
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 61c2cae..73b434a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.pregelix.api.graph.VertexCombiner;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
 
 /**
  * This class represents a Pregelix job.
@@ -50,8 +51,10 @@
     public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
     /** Message value class */
     public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
-    /** Aggregate value class */
-    public static final String Aggregate_VALUE_CLASS = "pregelix.aggregateValueClass";
+    /** Partial aggregate value class */
+    public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
+    /** Final aggregate value class */
+    public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
     /** num of vertices */
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
@@ -66,6 +69,7 @@
      */
     public PregelixJob(String jobName) throws IOException {
         super(new Configuration(), jobName);
+        getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, GlobalCountAggregator.class, GlobalAggregator.class);
     }
 
     /**
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 2a25bdb..9d1084f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -117,10 +117,10 @@
      * @return User's vertex combiner class
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> Class<? extends GlobalAggregator<I, V, E, M, T>> getGlobalAggregatorClass(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
             Configuration conf) {
-        return (Class<? extends GlobalAggregator<I, V, E, M, T>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
-                null, GlobalAggregator.class);
+        return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
+                GlobalAggregator.class);
     }
 
     /**
@@ -145,9 +145,9 @@
      * @return Instantiated user vertex combiner class
      */
     @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> GlobalAggregator<I, V, E, M, T> createGlobalAggregator(
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
             Configuration conf) {
-        Class<? extends GlobalAggregator<I, V, E, M, T>> globalAggregatorClass = getGlobalAggregatorClass(conf);
+        Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
         return ReflectionUtils.newInstance(globalAggregatorClass, conf);
     }
 
@@ -287,17 +287,31 @@
     }
 
     /**
-     * Get the user's subclassed global aggregator value class.
+     * Get the user's subclassed global aggregator's partial aggregate value class.
      * 
      * @param conf
      *            Configuration to check
      * @return User's global aggregate value class
      */
     @SuppressWarnings("unchecked")
-    public static <M extends Writable> Class<M> getAggregateValueClass(Configuration conf) {
+    public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
         if (conf == null)
             conf = defaultConf;
-        return (Class<M>) conf.getClass(PregelixJob.Aggregate_VALUE_CLASS, Writable.class);
+        return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Get the user's subclassed global aggregator's global value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's global aggregate value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
     }
 
     /**
@@ -325,8 +339,26 @@
      *            Configuration to check
      * @return Instantiated user aggregate value
      */
-    public static <M extends Writable> M createAggregateValue(Configuration conf) {
-        Class<M> aggregateValueClass = getAggregateValueClass(conf);
+    public static <M extends Writable> M createPartialAggregateValue(Configuration conf) {
+        Class<M> aggregateValueClass = getPartialAggregateValueClass(conf);
+        try {
+            return aggregateValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Create a user aggregate value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    public static <M extends Writable> M createFinalAggregateValue(Configuration conf) {
+        Class<M> aggregateValueClass = getFinalAggregateValueClass(conf);
         try {
             return aggregateValueClass.newInstance();
         } catch (InstantiationException e) {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7add10f..63c1d8c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -196,6 +196,7 @@
     }
 
     private void execute(JobSpecification job) throws Exception {
+        job.setUseConnectorPolicyForScheduling(false);
         JobId jobId = hcc.startJob(applicationName, job,
                 profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.waitForCompletion(jobId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index e6007f9..2dddccc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -123,8 +123,11 @@
         Class aggregatorClass = conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS, GlobalAggregator.class);
         if (!aggregatorClass.equals(GlobalAggregator.class)) {
             List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
-            Type aggregateValueType = argTypes.get(4);
-            conf.setClass(PregelixJob.Aggregate_VALUE_CLASS, (Class<?>) aggregateValueType, Writable.class);
+            Type partialAggregateValueType = argTypes.get(4);
+            conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+                    Writable.class);
+            Type finalAggregateValueType = argTypes.get(5);
+            conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
         }
     }
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 6ea6358..5852b72 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -52,6 +52,7 @@
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -82,6 +83,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         JobSpecification spec = new JobSpecification();
 
@@ -118,6 +120,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -128,8 +132,9 @@
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3, StartComputeUpdateFunctionFactory.INSTANCE,
-                preHookFactory, null, rdMessage, rdDummy, rdFinal);
+                new BTreeDataflowHelperFactory(), inputRdFactory, 4,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate, rdFinal);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -140,6 +145,15 @@
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
 
         /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+        /**
          * construct bulk-load index operator
          */
         int[] fieldPermutation = new int[] { 0, 1 };
@@ -212,7 +226,9 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
                 terminateWriter, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 2, btreeBulkLoad, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+                finalAggregator, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 3, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -223,6 +239,7 @@
         spec.addRoot(emptySink);
         spec.addRoot(btreeBulkLoad);
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -235,6 +252,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         JobSpecification spec = new JobSpecification();
 
         /**
@@ -295,6 +313,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -303,8 +323,8 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3, ComputeUpdateFunctionFactory.INSTANCE,
-                preHookFactory, null, rdMessage, rdDummy, rdFinal);
+                new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdMessage, rdDummy, rdPartialAggregate, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -371,8 +391,17 @@
         TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
                 configurationFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -383,7 +412,10 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), join, 2, btreeBulkLoad, 0);
+
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+                finalAggregator, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 3, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 1dda68e..f1951f2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -52,6 +52,7 @@
 import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -83,6 +84,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         JobSpecification spec = new JobSpecification();
 
@@ -110,19 +112,22 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
-        
+
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
-                typeTraits, comparatorFactories,
-                JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
-                inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
@@ -181,6 +186,15 @@
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -189,6 +203,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
                 terminateWriter, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+                finalAggregator, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -197,6 +213,7 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
 
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
@@ -210,6 +227,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         JobSpecification spec = new JobSpecification();
 
         /**
@@ -262,6 +280,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -270,8 +290,9 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
-                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -325,6 +346,15 @@
                 configurationFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -336,6 +366,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+                finalAggregator, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -344,6 +376,7 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
 
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 7453cde..d075929 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
@@ -82,6 +83,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         JobSpecification spec = new JobSpecification();
 
@@ -116,6 +118,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -123,8 +127,9 @@
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE,
-                preHookFactory, null, rdMessage, rdDummy);
+                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -173,8 +178,17 @@
         TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
                 configurationFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
+        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -183,12 +197,15 @@
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
                 terminateWriter, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+                finalAggregator, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
 
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
 
         spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
@@ -202,6 +219,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         JobSpecification spec = new JobSpecification();
 
         /**
@@ -254,6 +272,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -262,8 +282,9 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
-                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -309,6 +330,15 @@
                 configurationFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -320,12 +350,15 @@
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+                finalAggregator, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
 
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
 
         spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 070f114..7c0b259 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
@@ -81,6 +82,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
         JobSpecification spec = new JobSpecification();
 
@@ -112,6 +114,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -119,8 +123,9 @@
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE,
-                preHookFactory, null, rdMessage, rdDummy);
+                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
@@ -186,6 +191,15 @@
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -194,6 +208,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
                 terminateWriter, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+                finalAggregator, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -202,6 +218,7 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
 
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
 
         spec.setFrameSize(frameSize);
@@ -214,6 +231,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
         Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
         Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+        Class<? extends Writable> partialAggregateValueClass = BspUtils.getPartialAggregateValueClass(conf);
         JobSpecification spec = new JobSpecification();
 
         /**
@@ -266,6 +284,8 @@
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
         RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MessageList.class.getName());
+        RecordDescriptor rdPartialAggregate = DataflowUtils
+                .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
@@ -274,8 +294,9 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
-                ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -336,6 +357,15 @@
                 configurationFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
 
+        /**
+         * final aggregate write operator
+         */
+        IRecordDescriptorFactory aggRdFactory = DataflowUtils
+                .getWritableRecordDescriptorFactoryFromWritableClasses(partialAggregateValueClass.getName());
+        FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
+                configurationFactory, aggRdFactory, jobId);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -347,6 +377,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+                finalAggregator, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -355,6 +387,7 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
 
         spec.addRoot(terminateWriter);
+        spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
 
         spec.setFrameSize(frameSize);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index f57b3fa..f1dff72 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -29,6 +30,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
 import edu.uci.ics.pregelix.dataflow.context.StateKey;
 
@@ -89,6 +91,21 @@
         }
     }
 
+    public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
+            throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+            Path path = new Path(pathStr);
+            FSDataOutputStream output = dfs.create(path, true);
+            agg.write(output);
+            output.flush();
+            output.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
     public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
@@ -103,4 +120,19 @@
         }
     }
 
+    public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+            Path path = new Path(pathStr);
+            FSDataInputStream input = dfs.open(path);
+            Writable agg = BspUtils.createFinalAggregateValue(conf);
+            agg.readFields(input);
+            input.close();
+            return agg;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index ed0ecb6..105d3e2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.io.Writable;
+
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -28,10 +30,13 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
@@ -39,7 +44,11 @@
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
     private static final long serialVersionUID = 1L;
-    public static IUpdateFunctionFactory INSTANCE = new ComputeUpdateFunctionFactory();
+    private final IConfigurationFactory confFactory;
+
+    public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
 
     @Override
     public IUpdateFunction createFunction() {
@@ -48,6 +57,7 @@
             private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
 
             // for writing out to message channel
             private IFrameWriter writerMsg;
@@ -66,6 +76,12 @@
             private ByteBuffer bufferTerminate;
             private boolean terminate = true;
 
+            // for writing out termination detection control channel
+            private IFrameWriter writerGlobalAggregate;
+            private FrameTupleAppender appenderGlobalAggregate;
+            private ByteBuffer bufferGlobalAggregate;
+            private GlobalAggregator aggregator;
+
             private Vertex vertex;
             private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
             private DataOutput output = new DataOutputStream(bbos);
@@ -78,6 +94,9 @@
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
+                this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+                this.aggregator.init();
+
                 this.writerMsg = writers[0];
                 this.bufferMsg = ctx.allocateFrame();
                 this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
@@ -90,8 +109,13 @@
                 this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
                 this.appenderTerminate.reset(bufferTerminate, true);
 
-                if (writers.length > 2) {
-                    this.writerAlive = writers[2];
+                this.writerGlobalAggregate = writers[2];
+                this.bufferGlobalAggregate = ctx.allocateFrame();
+                this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+
+                if (writers.length > 3) {
+                    this.writerAlive = writers[3];
                     this.bufferAlive = ctx.allocateFrame();
                     this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
                     this.appenderAlive.reset(bufferAlive, true);
@@ -133,6 +157,8 @@
                  */
                 if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
                     terminate = false;
+
+                aggregator.step(vertex);
             }
 
             @Override
@@ -143,6 +169,25 @@
                 if (!terminate) {
                     writeOutTerminationState();
                 }
+
+                /** write out global aggregate value */
+                writeOutGlobalAggregate();
+            }
+
+            private void writeOutGlobalAggregate() throws HyracksDataException {
+                try {
+                    /**
+                     * get partial aggregate result and flush to the final aggregator
+                     */
+                    Writable agg = aggregator.finishPartial();
+                    agg.write(tbGlobalAggregate.getDataOutput());
+                    tbGlobalAggregate.addFieldEndOffset();
+                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
             }
 
             private void writeOutTerminationState() throws HyracksDataException {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 9ea8215..f72b059 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.io.Writable;
+
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -28,10 +30,13 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.ArrayListWritable.ArrayIterator;
+import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
 import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
@@ -39,7 +44,11 @@
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
     private static final long serialVersionUID = 1L;
-    public static IUpdateFunctionFactory INSTANCE = new StartComputeUpdateFunctionFactory();
+    private final IConfigurationFactory confFactory;
+
+    public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
 
     @Override
     public IUpdateFunction createFunction() {
@@ -48,6 +57,7 @@
             private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
 
             // for writing out to message channel
             private IFrameWriter writerMsg;
@@ -61,6 +71,12 @@
             private boolean pushAlive;
 
             // for writing out termination detection control channel
+            private IFrameWriter writerGlobalAggregate;
+            private FrameTupleAppender appenderGlobalAggregate;
+            private ByteBuffer bufferGlobalAggregate;
+            private GlobalAggregator aggregator;
+
+            //for writing out the global aggregate
             private IFrameWriter writerTerminate;
             private FrameTupleAppender appenderTerminate;
             private ByteBuffer bufferTerminate;
@@ -81,6 +97,9 @@
             @Override
             public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
                     throws HyracksDataException {
+                this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+                this.aggregator.init();
+
                 this.writerMsg = writers[0];
                 this.bufferMsg = ctx.allocateFrame();
                 this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
@@ -93,8 +112,13 @@
                 this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
                 this.appenderTerminate.reset(bufferTerminate, true);
 
-                if (writers.length > 2) {
-                    this.writerAlive = writers[2];
+                this.writerGlobalAggregate = writers[2];
+                this.bufferGlobalAggregate = ctx.allocateFrame();
+                this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+
+                if (writers.length > 3) {
+                    this.writerAlive = writers[3];
                     this.bufferAlive = ctx.allocateFrame();
                     this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
                     this.appenderAlive.reset(bufferAlive, true);
@@ -134,6 +158,11 @@
                  */
                 if (terminate && (!vertex.isHalted() || vertex.hasMessage()))
                     terminate = false;
+
+                /**
+                 * call the global aggregator
+                 */
+                aggregator.step(vertex);
             }
 
             @Override
@@ -144,6 +173,25 @@
                 if (!terminate) {
                     writeOutTerminationState();
                 }
+                
+                /**write out global aggregate value*/
+                writeOutGlobalAggregate();
+            }
+
+            private void writeOutGlobalAggregate() throws HyracksDataException {
+                try {
+                    /**
+                     * get partial aggregate result and flush to the final aggregator
+                     */
+                    Writable agg = aggregator.finishPartial();
+                    agg.write(tbGlobalAggregate.getDataOutput());
+                    tbGlobalAggregate.addFieldEndOffset();
+                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
             }
 
             private void writeOutTerminationState() throws HyracksDataException {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java
deleted file mode 100755
index a8bea45..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.runtime.simpleagg;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
-import edu.uci.ics.pregelix.api.graph.Vertex;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class GlobalAggregationFunction implements IAggregateFunction {
-    private final Configuration conf;
-    private final DataOutput output;
-    private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
-    private DataInput valueInput = new DataInputStream(valueInputStream);
-    private GlobalAggregator globalAggregator;
-    private Vertex vertex;
-    private Writable aggregateResult;
-
-    public GlobalAggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
-            throws HyracksDataException {
-        this.conf = confFactory.createConfiguration();
-        this.output = output;
-
-        vertex = BspUtils.createVertex(conf);
-        aggregateResult = BspUtils.createAggregateValue(conf);
-        globalAggregator = BspUtils.createGlobalAggregator(conf);
-    }
-
-    @Override
-    public void init() throws HyracksDataException {
-
-    }
-
-    @Override
-    public void step(IFrameTupleReference tuple) throws HyracksDataException {
-        FrameTupleReference ftr = (FrameTupleReference) tuple;
-        IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
-        ByteBuffer buffer = fta.getBuffer();
-        int tIndex = ftr.getTupleIndex();
-
-        int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
-                + fta.getFieldStartOffset(tIndex, 1);
-
-        valueInputStream.setByteBuffer(buffer, valueStart);
-        try {
-            vertex.readFields(valueInput);
-            globalAggregator.step(vertex);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-
-    }
-
-    @Override
-    public void finish() throws HyracksDataException {
-        try {
-            aggregateResult = globalAggregator.finish();
-            aggregateResult.write(output);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java
deleted file mode 100755
index 519634d..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.runtime.simpleagg;
-
-import java.io.DataOutput;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
-import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
-
-public class GlobalAggregationFunctionFactory implements IAggregateFunctionFactory {
-    private static final long serialVersionUID = 1L;
-    private final IConfigurationFactory confFactory;
-    private final boolean isFinalStage;
-
-    public GlobalAggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
-        this.confFactory = confFactory;
-        this.isFinalStage = isFinalStage;
-    }
-
-    @Override
-    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
-        DataOutput output = provider.getDataOutput();
-        return new GlobalAggregationFunction(confFactory, output, isFinalStage);
-    }
-}