address Vinayak's code review comments
diff --git a/pregelix/pregelix-benchmark/pom.xml b/pregelix/pregelix-benchmark/pom.xml
new file mode 100644
index 0000000..4d7d456
--- /dev/null
+++ b/pregelix/pregelix-benchmark/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pregelix</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.10-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pregelix-benchmark</artifactId>
+ <name>pregelix-benchmark</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-core</artifactId>
+ <version>1.0.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
new file mode 100644
index 0000000..04c29de
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/PageRankVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
+ private final DoubleWritable vertexValue = new DoubleWritable();
+ private final DoubleWritable msg = new DoubleWritable();
+ private int maxIteration = -1;
+
+ @Override
+ public void compute(Iterable<DoubleWritable> msgIterator) {
+ if (maxIteration < 0) {
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+ }
+ if (getSuperstep() == 1) {
+ vertexValue.set(1.0 / getTotalNumVertices());
+ }
+ if (getSuperstep() >= 2 && getSuperstep() <= maxIteration) {
+ double sum = 0;
+ for (DoubleWritable msg : msgIterator) {
+ sum += msg.get();
+ }
+ vertexValue.set((0.15 / getTotalNumVertices()) + 0.85 * sum);
+ }
+
+ if (getSuperstep() >= 1 && getSuperstep() < maxIteration) {
+ long edges = getNumEdges();
+ msg.set(vertexValue.get() / edges);
+ sendMessageToAllEdges(msg);
+ } else {
+ voteToHalt();
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
new file mode 100644
index 0000000..3d85f66
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/TextPageRankInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.benchmark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MapMutableEdge;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextVertexReaderFromEachLine() {
+ String[] items;
+
+ @Override
+ protected VLongWritable getId(Text line) throws IOException {
+ items = line.toString().split(" ");
+ return new VLongWritable(Long.parseLong(items[0]));
+ }
+
+ @Override
+ protected DoubleWritable getValue(Text line) throws IOException {
+ return null;
+ }
+
+ @Override
+ protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
+ List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
+ Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ for (int i = 1; i < items.length; i++) {
+ edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ }
+ for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ edge.setEntry(entry);
+ edge.setValue(null);
+ edges.add(edge);
+ }
+ return edges;
+ }
+
+ };
+ }
+}