necessary modifications for adding global aggregator
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2065 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 754c76e..61c2cae 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -30,29 +31,31 @@
*/
public class PregelixJob extends Job {
/** Vertex class - required */
- public static final String VERTEX_CLASS = "giraph.vertexClass";
+ public static final String VERTEX_CLASS = "pregelix.vertexClass";
/** VertexInputFormat class - required */
- public static final String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+ public static final String VERTEX_INPUT_FORMAT_CLASS = "pregelix.vertexInputFormatClass";
/** VertexOutputFormat class - optional */
- public static final String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+ public static final String VERTEX_OUTPUT_FORMAT_CLASS = "pregelix.vertexOutputFormatClass";
/** Vertex combiner class - optional */
- public static final String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+ public static final String VERTEX_COMBINER_CLASS = "pregelix.combinerClass";
+ /** Global aggregator class - optional */
+ public static final String GLOBAL_AGGREGATOR_CLASS = "pregelix.aggregatorClass";
/** Vertex resolver class - optional */
- public static final String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+ public static final String VERTEX_RESOLVER_CLASS = "pregelix.vertexResolverClass";
/** Vertex index class */
- public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
+ public static final String VERTEX_INDEX_CLASS = "pregelix.vertexIndexClass";
/** Vertex value class */
- public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+ public static final String VERTEX_VALUE_CLASS = "pregelix.vertexValueClass";
/** Edge value class */
- public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+ public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
/** Message value class */
- public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
- /** Aggregator class */
- public static final String AGGREGATOR_NAME = "giraph.aggregatorClass";
+ public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
+ /** Aggregate value class */
+ public static final String Aggregate_VALUE_CLASS = "pregelix.aggregateValueClass";
/** num of vertices */
- public static final String NUM_VERTICE = "giraph.numVertices";
+ public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
- public static final String NUM_EDGES = "giraph.numEdges";
+ public static final String NUM_EDGES = "pregelix.numEdges";
/**
* Constructor that will instantiate the configuration
@@ -117,4 +120,14 @@
final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
getConfiguration().setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, VertexCombiner.class);
}
+
+ /**
+ * Set the global aggregator class (optional)
+ *
+ * @param vertexCombinerClass
+ * Determines how vertex messages are combined
+ */
+ final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
+ getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 52909d5..2a25bdb 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.graph.VertexCombiner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -109,6 +110,20 @@
}
/**
+ * Get the user's subclassed {@link GlobalAggregator}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex combiner class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> Class<? extends GlobalAggregator<I, V, E, M, T>> getGlobalAggregatorClass(
+ Configuration conf) {
+ return (Class<? extends GlobalAggregator<I, V, E, M, T>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
+ null, GlobalAggregator.class);
+ }
+
+ /**
* Create a user vertex combiner class
*
* @param conf
@@ -123,6 +138,20 @@
}
/**
+ * Create a global aggregator class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, T extends Writable> GlobalAggregator<I, V, E, M, T> createGlobalAggregator(
+ Configuration conf) {
+ Class<? extends GlobalAggregator<I, V, E, M, T>> globalAggregatorClass = getGlobalAggregatorClass(conf);
+ return ReflectionUtils.newInstance(globalAggregatorClass, conf);
+ }
+
+ /**
* Get the user's subclassed Vertex.
*
* @param conf
@@ -258,6 +287,20 @@
}
/**
+ * Get the user's subclassed global aggregator value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's global aggregate value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getAggregateValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.Aggregate_VALUE_CLASS, Writable.class);
+ }
+
+ /**
* Create a user vertex message value
*
* @param conf
@@ -274,4 +317,22 @@
throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
}
}
+
+ /**
+ * Create a user aggregate value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static <M extends Writable> M createAggregateValue(Configuration conf) {
+ Class<M> aggregateValueClass = getAggregateValueClass(conf);
+ try {
+ return aggregateValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 38d004d..e6007f9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -59,6 +59,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -118,6 +119,13 @@
conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+
+ Class aggregatorClass = conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS, GlobalAggregator.class);
+ if (!aggregatorClass.equals(GlobalAggregator.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
+ Type aggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.Aggregate_VALUE_CLASS, (Class<?>) aggregateValueType, Writable.class);
+ }
}
public String getJobId() {
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
index fc2ab0a..22ae6cf 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsReal.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>20</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -120,10 +118,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
index 1a021c2..50662f9 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ConnectedComponentsRealComplex.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>23</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -120,10 +118,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ConnectedComponentsVertex$SimpleConnectedComponentsVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
index e5294b7..e425b38 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRank.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>20</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -120,10 +118,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
index a3d0551..b51bd98 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankReal.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>20</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -120,10 +118,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
index 0aa829b..a9e43bd 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealComplex.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>23</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -120,10 +118,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
index bae8cec..3719247 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPaths.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>20</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -121,10 +119,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
index 954f50e..90caf6b 100644
--- a/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ShortestPathsReal.xml
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
-<property><name>giraph.combinerClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
@@ -19,7 +18,6 @@
<property><name>io.bytes.per.checksum</name><value>512</value></property>
<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
-<property><name>giraph.numVertices</name><value>20</value></property>
<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
@@ -31,7 +29,6 @@
<property><name>hadoop.logfile.count</name><value>10</value></property>
<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><name>mapred.output.dir</name><value>/result</value></property>
-<property><name>giraph.vertexClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex</value></property>
<property><name>io.map.index.skip</name><value>0</value></property>
<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
<property><name>mapred.output.compress</name><value>false</value></property>
@@ -57,6 +54,7 @@
<property><name>hadoop.security.authorization</name><value>false</value></property>
<property><name>mapred.max.tracker.failures</name><value>4</value></property>
<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
<property><name>mapred.map.tasks</name><value>2</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
@@ -67,6 +65,7 @@
<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
<property><name>mapred.reduce.tasks</name><value>1</value></property>
<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex</value></property>
<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
<property><name>io.file.buffer.size</name><value>4096</value></property>
<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
@@ -87,7 +86,6 @@
<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
<property><name>mapred.job.tracker</name><value>local</value></property>
<property><name>io.skip.checksum.errors</name><value>false</value></property>
-<property><name>giraph.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
<property><name>fs.s3.maxRetries</name><value>4</value></property>
<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
@@ -106,7 +104,7 @@
<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
<property><name>webinterface.private.actions</name><value>false</value></property>
<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
-<property><name>giraph.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ShortestPathsVertex$SimpleMinCombiner</value></property>
<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
@@ -121,10 +119,12 @@
<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
<property><name>job.end.retry.attempts</name><value>0</value></property>
<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat</value></property>
<property><name>mapred.job.queue.name</name><value>default</value></property>
<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java
new file mode 100755
index 0000000..a8bea45
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class GlobalAggregationFunction implements IAggregateFunction {
+ private final Configuration conf;
+ private final DataOutput output;
+ private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
+ private DataInput valueInput = new DataInputStream(valueInputStream);
+ private GlobalAggregator globalAggregator;
+ private Vertex vertex;
+ private Writable aggregateResult;
+
+ public GlobalAggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage)
+ throws HyracksDataException {
+ this.conf = confFactory.createConfiguration();
+ this.output = output;
+
+ vertex = BspUtils.createVertex(conf);
+ aggregateResult = BspUtils.createAggregateValue(conf);
+ globalAggregator = BspUtils.createGlobalAggregator(conf);
+ }
+
+ @Override
+ public void init() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ FrameTupleReference ftr = (FrameTupleReference) tuple;
+ IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+ ByteBuffer buffer = fta.getBuffer();
+ int tIndex = ftr.getTupleIndex();
+
+ int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+ + fta.getFieldStartOffset(tIndex, 1);
+
+ valueInputStream.setByteBuffer(buffer, valueStart);
+ try {
+ vertex.readFields(valueInput);
+ globalAggregator.step(vertex);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ try {
+ aggregateResult = globalAggregator.finish();
+ aggregateResult.write(output);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java
new file mode 100755
index 0000000..519634d
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/GlobalAggregationFunctionFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.runtime.simpleagg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
+
+public class GlobalAggregationFunctionFactory implements IAggregateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final boolean isFinalStage;
+
+ public GlobalAggregationFunctionFactory(IConfigurationFactory confFactory, boolean isFinalStage) {
+ this.confFactory = confFactory;
+ this.isFinalStage = isFinalStage;
+ }
+
+ @Override
+ public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
+ DataOutput output = provider.getDataOutput();
+ return new GlobalAggregationFunction(confFactory, output, isFinalStage);
+ }
+}