add support for non combiner jobs

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2077 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index e24439f..c78327e 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -121,7 +121,7 @@
      * @param vertexCombinerClass
      *            Determines how vertex messages are combined
      */
-    final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
+    final public void setMessageCombinerClass(Class<?> vertexCombinerClass) {
         getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
     }
 
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 265fd3e..d2ba28d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,7 +141,9 @@
         used = 0;
         this.clear();
         int numValues = in.readInt(); // read number of values
-        ensureCapacity(numValues);
+        if (numValues > 100) {
+            System.out.println("num values: " + numValues);
+        }
         for (int i = 0; i < numValues; i++) {
             M value = allocateValue();
             value.readFields(in); // read a value
@@ -151,6 +153,9 @@
 
     public void write(DataOutput out) throws IOException {
         int numValues = size();
+        if (numValues > 100) {
+            System.out.println("write num values: " + numValues);
+        }
         out.writeInt(numValues); // write number of values
         for (int i = 0; i < numValues; i++) {
             get(i).write(out);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index e94331d..1ff8917 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -121,7 +121,7 @@
         conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
         conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
 
-        Class aggregatorClass = conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS, GlobalAggregator.class);
+        Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
         if (!aggregatorClass.equals(GlobalAggregator.class)) {
             List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
             Type partialAggregateValueType = argTypes.get(4);
@@ -131,7 +131,7 @@
             conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
         }
 
-        Class combinerClass = conf.getClass(PregelixJob.Message_COMBINER_CLASS, MessageCombiner.class);
+        Class combinerClass = BspUtils.getMessageCombinerClass(conf);
         if (!combinerClass.equals(MessageCombiner.class)) {
             List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
             Type partialCombineValueType = argTypes.get(2);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 45d3687..00cdf07 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -180,7 +180,8 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -188,8 +189,8 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -348,7 +349,8 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -356,8 +358,8 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 8252555..3847aa7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -146,7 +146,7 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -157,7 +157,7 @@
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
         IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+                .getAccumulatingAggregatorFactory(conf, true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -306,7 +306,7 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -315,7 +315,7 @@
          * construct global group-by operator
          */
         IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+                .getAccumulatingAggregatorFactory(conf, true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 68ea01f..ec783a7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -149,8 +149,8 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, false);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -297,8 +297,8 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, false);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 0792e1d..bb939e3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -143,7 +143,8 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -160,8 +161,8 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -309,7 +310,8 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -324,8 +326,8 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index aa389e2..bcf3ffc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -71,9 +71,10 @@
         return rdFactory;
     }
 
-    public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal) {
+    public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal,
+            boolean partialAggAsInput) {
         IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
-                isFinal);
+                isFinal, partialAggAsInput);
         IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
                 new IAggregateFunctionFactory[] { aggFuncFactory });
         return aggregatorFactory;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index c0e87d3..b51571c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -120,7 +120,7 @@
         job.setVertexClass(ConnectedComponentsVertex.class);
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
-        job.setVertexCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+        job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
         Client.run(args, job);
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 0a6402c..7e21245 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -207,7 +207,7 @@
         job.setVertexClass(PageRankVertex.class);
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
-        job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+        job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         Client.run(args, job);
     }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index b039c08..89ea951 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -132,7 +132,7 @@
         job.setVertexClass(ShortestPathsVertex.class);
         job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
-        job.setVertexCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+        job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
         job.getConfiguration().setLong(SOURCE_ID, 0);
         Client.run(args, job);
     }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index fb7dc6c..99fec55 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -46,7 +46,7 @@
         job.setVertexClass(PageRankVertex.class);
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
-        job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+        job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -58,7 +58,7 @@
         job.setVertexClass(PageRankVertex.class);
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
-        job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+        job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -70,7 +70,7 @@
         job.setVertexClass(ShortestPathsVertex.class);
         job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
-        job.setVertexCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+        job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -78,12 +78,23 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generatePageRankJobRealNoCombiner(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(PageRankVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
     private static void generateConnectedComponentsJobReal(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(ConnectedComponentsVertex.class);
         job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
         job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
-        job.setVertexCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+        job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -95,7 +106,7 @@
         job.setVertexClass(ConnectedComponentsVertex.class);
         job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
         job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
-        job.setVertexCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+        job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
         job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -106,7 +117,7 @@
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(PageRankVertex.class);
         job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-        job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+        job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
@@ -118,13 +129,14 @@
         generatePageRankJob("PageRank", outputBase + "PageRank.xml");
         generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
         generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
+        generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
     }
 
     private static void generateShortestPathJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(ShortestPathsVertex.class);
         job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
-        job.setVertexCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+        job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
         job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
         FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
         FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 2e01d71..89bce34 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -30,7 +30,6 @@
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
 import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -69,14 +68,14 @@
         job.setJobName(jobName);
         this.resultFileName = resultFile;
         this.expectedFileName = expectedFile;
-        giraphJobGens = new JobGen[4];
+        giraphJobGens = new JobGen[3];
         giraphJobGens[0] = new JobGenOuterJoin(job);
         waitawhile();
         giraphJobGens[1] = new JobGenInnerJoin(job);
         waitawhile();
         giraphJobGens[2] = new JobGenOuterJoinSort(job);
-        waitawhile();
-        giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
+        //waitawhile();
+        // giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
     }
 
     private void waitawhile() throws InterruptedException {
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index e953261..22ae6cf 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index ffff302..50662f9 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 5eba09a..e425b38 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 76905cd..b51bd98 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 3f8a04f..a9e43bd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -124,7 +124,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index f9c4bcd..3719247 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -125,7 +125,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index c415cba..90caf6b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -125,7 +125,6 @@
 <property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
 <property><name>hadoop.logfile.size</name><value>10000000</value></property>
 <property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
 <property><name>mapred.job.queue.name</name><value>default</value></property>
 <property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
 <property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 071bc17..6962aa5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -40,6 +40,7 @@
 public class AggregationFunction implements IAggregateFunction {
     private final Configuration conf;
     private final boolean isFinalStage;
+    private final boolean partialAggAsInput;
     private final DataOutput output;
     private MessageCombiner combiner;
     private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
@@ -52,16 +53,17 @@
     private MsgList msgList = new MsgList();
     private boolean keyRead = false;
 
-    public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
-            throws HyracksDataException {
+    public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage,
+            boolean partialAggAsInput) throws HyracksDataException {
         this.conf = confFactory.createConfiguration();
         this.output = output;
         this.isFinalStage = isFinalStage;
+        this.partialAggAsInput = partialAggAsInput;
         msgList.setConf(this.conf);
 
         combiner = BspUtils.createMessageCombiner(conf);
         key = BspUtils.createVertexIndex(conf);
-        value = !isFinalStage ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
+        value = !partialAggAsInput ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
     }
 
     @Override
@@ -90,7 +92,7 @@
                 keyRead = true;
             }
             value.readFields(valueInput);
-            if (!isFinalStage) {
+            if (!partialAggAsInput) {
                 combiner.step(key, value);
             } else {
                 combiner.step(value);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index b6402ef..a09f688 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -27,15 +27,17 @@
     private static final long serialVersionUID = 1L;
     private final IConfigurationFactory confFactory;
     private final boolean isFinalStage;
+    private final boolean partialAggAsInput;
 
-    public AggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
+    public AggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage, boolean partialAggAsInput) {
         this.confFactory = confFactory;
         this.isFinalStage = isFinalStage;
+        this.partialAggAsInput = partialAggAsInput;
     }
 
     @Override
     public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
         DataOutput output = provider.getDataOutput();
-        return new AggregationFunction(confFactory, output, isFinalStage);
+        return new AggregationFunction(confFactory, output, isFinalStage, partialAggAsInput);
     }
 }