add support for non combiner jobs
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2077 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index e24439f..c78327e 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -121,7 +121,7 @@
* @param vertexCombinerClass
* Determines how vertex messages are combined
*/
- final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
+ final public void setMessageCombinerClass(Class<?> vertexCombinerClass) {
getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
index 265fd3e..d2ba28d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -141,7 +141,9 @@
used = 0;
this.clear();
int numValues = in.readInt(); // read number of values
- ensureCapacity(numValues);
+ if (numValues > 100) {
+ System.out.println("num values: " + numValues);
+ }
for (int i = 0; i < numValues; i++) {
M value = allocateValue();
value.readFields(in); // read a value
@@ -151,6 +153,9 @@
public void write(DataOutput out) throws IOException {
int numValues = size();
+ if (numValues > 100) {
+ System.out.println("write num values: " + numValues);
+ }
out.writeInt(numValues); // write number of values
for (int i = 0; i < numValues; i++) {
get(i).write(out);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index e94331d..1ff8917 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -121,7 +121,7 @@
conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
- Class aggregatorClass = conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS, GlobalAggregator.class);
+ Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
if (!aggregatorClass.equals(GlobalAggregator.class)) {
List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
Type partialAggregateValueType = argTypes.get(4);
@@ -131,7 +131,7 @@
conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
}
- Class combinerClass = conf.getClass(PregelixJob.Message_COMBINER_CLASS, MessageCombiner.class);
+ Class combinerClass = BspUtils.getMessageCombinerClass(conf);
if (!combinerClass.equals(MessageCombiner.class)) {
List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
Type partialCombineValueType = argTypes.get(2);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 45d3687..00cdf07 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -180,7 +180,8 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -188,8 +189,8 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -348,7 +349,8 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -356,8 +358,8 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 8252555..3847aa7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -146,7 +146,7 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -157,7 +157,7 @@
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ .getAccumulatingAggregatorFactory(conf, true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -306,7 +306,7 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -315,7 +315,7 @@
* construct global group-by operator
*/
IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ .getAccumulatingAggregatorFactory(conf, true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 68ea01f..ec783a7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -149,8 +149,8 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, false);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -297,8 +297,8 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, false);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 0792e1d..bb939e3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -143,7 +143,8 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -160,8 +161,8 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -309,7 +310,8 @@
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -324,8 +326,8 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index aa389e2..bcf3ffc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -71,9 +71,10 @@
return rdFactory;
}
- public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal) {
+ public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal,
+ boolean partialAggAsInput) {
IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
- isFinal);
+ isFinal, partialAggAsInput);
IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
new IAggregateFunctionFactory[] { aggFuncFactory });
return aggregatorFactory;
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index c0e87d3..b51571c 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -120,7 +120,7 @@
job.setVertexClass(ConnectedComponentsVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
- job.setVertexCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 0a6402c..7e21245 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -207,7 +207,7 @@
job.setVertexClass(PageRankVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index b039c08..89ea951 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -132,7 +132,7 @@
job.setVertexClass(ShortestPathsVertex.class);
job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- job.setVertexCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+ job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
job.getConfiguration().setLong(SOURCE_ID, 0);
Client.run(args, job);
}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index fb7dc6c..99fec55 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -46,7 +46,7 @@
job.setVertexClass(PageRankVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -58,7 +58,7 @@
job.setVertexClass(PageRankVertex.class);
job.setVertexInputFormatClass(TextPageRankInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -70,7 +70,7 @@
job.setVertexClass(ShortestPathsVertex.class);
job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
- job.setVertexCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+ job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -78,12 +78,23 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
+ private static void generatePageRankJobRealNoCombiner(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
private static void generateConnectedComponentsJobReal(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(ConnectedComponentsVertex.class);
job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
- job.setVertexCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
@@ -95,7 +106,7 @@
job.setVertexClass(ConnectedComponentsVertex.class);
job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
- job.setVertexCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
@@ -106,7 +117,7 @@
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(PageRankVertex.class);
job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- job.setVertexCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
@@ -118,13 +129,14 @@
generatePageRankJob("PageRank", outputBase + "PageRank.xml");
generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
+ generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
}
private static void generateShortestPathJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(ShortestPathsVertex.class);
job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- job.setVertexCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+ job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 2e01d71..89bce34 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -30,7 +30,6 @@
import edu.uci.ics.pregelix.core.jobgen.JobGen;
import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -69,14 +68,14 @@
job.setJobName(jobName);
this.resultFileName = resultFile;
this.expectedFileName = expectedFile;
- giraphJobGens = new JobGen[4];
+ giraphJobGens = new JobGen[3];
giraphJobGens[0] = new JobGenOuterJoin(job);
waitawhile();
giraphJobGens[1] = new JobGenInnerJoin(job);
waitawhile();
giraphJobGens[2] = new JobGenOuterJoinSort(job);
- waitawhile();
- giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
+ //waitawhile();
+ // giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
}
private void waitawhile() throws InterruptedException {
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index e953261..22ae6cf 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index ffff302..50662f9 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index 5eba09a..e425b38 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index 76905cd..b51bd98 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 3f8a04f..a9e43bd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -124,7 +124,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index f9c4bcd..3719247 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -125,7 +125,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index c415cba..90caf6b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -125,7 +125,6 @@
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
-<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.api.util.GlobalCountAggregator</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 071bc17..6962aa5 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -40,6 +40,7 @@
public class AggregationFunction implements IAggregateFunction {
private final Configuration conf;
private final boolean isFinalStage;
+ private final boolean partialAggAsInput;
private final DataOutput output;
private MessageCombiner combiner;
private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
@@ -52,16 +53,17 @@
private MsgList msgList = new MsgList();
private boolean keyRead = false;
- public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
- throws HyracksDataException {
+ public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage,
+ boolean partialAggAsInput) throws HyracksDataException {
this.conf = confFactory.createConfiguration();
this.output = output;
this.isFinalStage = isFinalStage;
+ this.partialAggAsInput = partialAggAsInput;
msgList.setConf(this.conf);
combiner = BspUtils.createMessageCombiner(conf);
key = BspUtils.createVertexIndex(conf);
- value = !isFinalStage ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
+ value = !partialAggAsInput ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
}
@Override
@@ -90,7 +92,7 @@
keyRead = true;
}
value.readFields(valueInput);
- if (!isFinalStage) {
+ if (!partialAggAsInput) {
combiner.step(key, value);
} else {
combiner.step(value);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index b6402ef..a09f688 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -27,15 +27,17 @@
private static final long serialVersionUID = 1L;
private final IConfigurationFactory confFactory;
private final boolean isFinalStage;
+ private final boolean partialAggAsInput;
- public AggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
+ public AggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage, boolean partialAggAsInput) {
this.confFactory = confFactory;
this.isFinalStage = isFinalStage;
+ this.partialAggAsInput = partialAggAsInput;
}
@Override
public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
DataOutput output = provider.getDataOutput();
- return new AggregationFunction(confFactory, output, isFinalStage);
+ return new AggregationFunction(confFactory, output, isFinalStage, partialAggAsInput);
}
}