revise benchmark input/output format
diff --git a/pregelix/pregelix-benchmark/pom.xml b/pregelix/pregelix-benchmark/pom.xml
index df262df..c1344ea 100644
--- a/pregelix/pregelix-benchmark/pom.xml
+++ b/pregelix/pregelix-benchmark/pom.xml
@@ -25,6 +25,25 @@
<fork>true</fork>
</configuration>
</plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-my-jar-with-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -43,6 +62,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.giraph</groupId>
+ <artifactId>giraph-examples</artifactId>
+ <version>1.0.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-hdfs-core</artifactId>
<version>0.2.10-SNAPSHOT</version>
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java
index 6a088df..b290907 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.pregelix.benchmark.io;
import java.io.IOException;
@@ -10,13 +25,13 @@
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MapMutableEdge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextCCInputFormat extends TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable> {
+public class TextCCInputFormat extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -24,25 +39,25 @@
String[] items;
@Override
- protected VLongWritable getId(Text line) throws IOException {
+ protected LongWritable getId(Text line) throws IOException {
items = line.toString().split(" ");
- return new VLongWritable(Long.parseLong(items[0]));
+ return new LongWritable(Long.parseLong(items[0]));
}
@Override
- protected VLongWritable getValue(Text line) throws IOException {
+ protected LongWritable getValue(Text line) throws IOException {
return null;
}
@Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+ List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+ Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
}
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
edge.setEntry(entry);
edge.setValue(null);
edges.add(edge);
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat2.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat2.java
deleted file mode 100644
index 45b4614..0000000
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCInputFormat2.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package edu.uci.ics.pregelix.benchmark.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.MapMutableEdge;
-import org.apache.giraph.io.formats.TextVertexInputFormat;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class TextCCInputFormat2 extends TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable> {
-
- @Override
- public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
- return new TextVertexReaderFromEachLine() {
- String[] items;
-
- @Override
- protected VLongWritable getId(Text line) throws IOException {
- String[] kv = line.toString().split("\t");
- items = kv[1].split(" ");
- return new VLongWritable(Long.parseLong(kv[0]));
- }
-
- @Override
- protected VLongWritable getValue(Text line) throws IOException {
- return null;
- }
-
- @Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
- for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
- }
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
- edge.setEntry(entry);
- edge.setValue(null);
- edges.add(edge);
- }
- return edges;
- }
-
- };
- }
-
-}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java
index 7aa71c1..770c6e1 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextCCOutputFormat.java
@@ -1,22 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.pregelix.benchmark.io;
import java.io.IOException;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextCCOutputFormat extends TextVertexOutputFormat<VLongWritable, VLongWritable, FloatWritable> {
+public class TextCCOutputFormat extends TextVertexOutputFormat<LongWritable, LongWritable, NullWritable> {
@Override
public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new TextVertexWriterToEachLine() {
@Override
- protected Text convertVertexToLine(Vertex<VLongWritable, VLongWritable, FloatWritable, ?> vertex)
+ protected Text convertVertexToLine(Vertex<LongWritable, LongWritable, NullWritable, ?> vertex)
throws IOException {
return new Text(vertex.getId() + " " + vertex.getValue());
}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPRInputFormat.java
similarity index 66%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPRInputFormat.java
index b4e984f..38eef3a 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPRInputFormat.java
@@ -25,13 +25,13 @@
import org.apache.giraph.edge.MapMutableEdge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextPRInputFormat extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,9 +39,9 @@
String[] items;
@Override
- protected VLongWritable getId(Text line) throws IOException {
+ protected LongWritable getId(Text line) throws IOException {
items = line.toString().split(" ");
- return new VLongWritable(Long.parseLong(items[0]));
+ return new LongWritable(Long.parseLong(items[0]));
}
@Override
@@ -50,14 +50,14 @@
}
@Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+ List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+ Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
}
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
edge.setEntry(entry);
edge.setValue(null);
edges.add(edge);
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPROutputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPROutputFormat.java
new file mode 100644
index 0000000..b14de6f
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPROutputFormat.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.io;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TextPROutputFormat extends TextVertexOutputFormat<LongWritable, DoubleWritable, NullWritable> {
+
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new TextVertexWriterToEachLine() {
+
+ @Override
+ protected Text convertVertexToLine(Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex)
+ throws IOException {
+ return new Text(vertex.getId() + " " + vertex.getValue());
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankOutputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankOutputFormat.java
deleted file mode 100644
index c7bbd76..0000000
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankOutputFormat.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package edu.uci.ics.pregelix.benchmark.io;
-
-import java.io.IOException;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class TextPageRankOutputFormat extends TextVertexOutputFormat<VLongWritable, DoubleWritable, FloatWritable> {
-
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- return new TextVertexWriterToEachLine() {
-
- @Override
- protected Text convertVertexToLine(Vertex<VLongWritable, DoubleWritable, FloatWritable, ?> vertex)
- throws IOException {
- return new Text(vertex.getId() + " " + vertex.getValue());
- }
-
- };
- }
-
-}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextSPInputFormat.java
similarity index 67%
rename from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat.java
rename to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextSPInputFormat.java
index b4e984f..953e93c 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextSPInputFormat.java
@@ -12,6 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package edu.uci.ics.pregelix.benchmark.io;
import java.io.IOException;
@@ -25,13 +26,12 @@
import org.apache.giraph.edge.MapMutableEdge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextPageRankInputFormat extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextSPInputFormat extends TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,9 +39,9 @@
String[] items;
@Override
- protected VLongWritable getId(Text line) throws IOException {
+ protected LongWritable getId(Text line) throws IOException {
items = line.toString().split(" ");
- return new VLongWritable(Long.parseLong(items[0]));
+ return new LongWritable(Long.parseLong(items[0]));
}
@Override
@@ -50,16 +50,16 @@
}
@Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ protected Iterable<Edge<LongWritable, DoubleWritable>> getEdges(Text line) throws IOException {
+ List<Edge<LongWritable, DoubleWritable>> edges = new ArrayList<Edge<LongWritable, DoubleWritable>>();
+ Map<LongWritable, DoubleWritable> edgeMap = new HashMap<LongWritable, DoubleWritable>();
for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
}
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ for (Entry<LongWritable, DoubleWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<LongWritable, DoubleWritable> edge = new MapMutableEdge<LongWritable, DoubleWritable>();
edge.setEntry(entry);
- edge.setValue(null);
+ edge.setValue(new DoubleWritable(1.0));
edges.add(edge);
}
return edges;
@@ -67,4 +67,5 @@
};
}
+
}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextCCInputFormat2.java
similarity index 61%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextCCInputFormat2.java
index 49c3241..0a70b3c 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextCCInputFormat2.java
@@ -12,7 +12,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.benchmark.io;
+
+package edu.uci.ics.pregelix.benchmark.io2;
import java.io.IOException;
import java.util.ArrayList;
@@ -24,14 +25,13 @@
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.MapMutableEdge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextPageRankInputFormat2 extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextCCInputFormat2 extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,26 +39,26 @@
String[] items;
@Override
- protected VLongWritable getId(Text line) throws IOException {
+ protected LongWritable getId(Text line) throws IOException {
String[] kv = line.toString().split("\t");
items = kv[1].split(" ");
- return new VLongWritable(Long.parseLong(items[0]));
+ return new LongWritable(Long.parseLong(kv[0]));
}
@Override
- protected DoubleWritable getValue(Text line) throws IOException {
+ protected LongWritable getValue(Text line) throws IOException {
return null;
}
@Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+ List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+ Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
}
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
edge.setEntry(entry);
edge.setValue(null);
edges.add(edge);
@@ -68,4 +68,5 @@
};
}
+
}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextPRInputFormat2.java
similarity index 65%
copy from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java
copy to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextPRInputFormat2.java
index 49c3241..63a4519 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextPRInputFormat2.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.benchmark.io;
+package edu.uci.ics.pregelix.benchmark.io2;
import java.io.IOException;
import java.util.ArrayList;
@@ -25,13 +25,13 @@
import org.apache.giraph.edge.MapMutableEdge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextPageRankInputFormat2 extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextPRInputFormat2 extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,10 +39,10 @@
String[] items;
@Override
- protected VLongWritable getId(Text line) throws IOException {
+ protected LongWritable getId(Text line) throws IOException {
String[] kv = line.toString().split("\t");
items = kv[1].split(" ");
- return new VLongWritable(Long.parseLong(items[0]));
+ return new LongWritable(Long.parseLong(items[0]));
}
@Override
@@ -51,14 +51,14 @@
}
@Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ protected Iterable<Edge<LongWritable, NullWritable>> getEdges(Text line) throws IOException {
+ List<Edge<LongWritable, NullWritable>> edges = new ArrayList<Edge<LongWritable, NullWritable>>();
+ Map<LongWritable, NullWritable> edgeMap = new HashMap<LongWritable, NullWritable>();
for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
}
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ for (Entry<LongWritable, NullWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<LongWritable, NullWritable> edge = new MapMutableEdge<LongWritable, NullWritable>();
edge.setEntry(entry);
edge.setValue(null);
edges.add(edge);
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextSPInputFormat2.java
similarity index 66%
rename from pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java
rename to pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextSPInputFormat2.java
index 49c3241..fdb1061 100644
--- a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io/TextPageRankInputFormat2.java
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/io2/TextSPInputFormat2.java
@@ -12,7 +12,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.benchmark.io;
+
+package edu.uci.ics.pregelix.benchmark.io2;
import java.io.IOException;
import java.util.ArrayList;
@@ -25,13 +26,12 @@
import org.apache.giraph.edge.MapMutableEdge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-public class TextPageRankInputFormat2 extends TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+public class TextSPInputFormat2 extends TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
@@ -39,10 +39,10 @@
String[] items;
@Override
- protected VLongWritable getId(Text line) throws IOException {
+ protected LongWritable getId(Text line) throws IOException {
String[] kv = line.toString().split("\t");
items = kv[1].split(" ");
- return new VLongWritable(Long.parseLong(items[0]));
+ return new LongWritable(Long.parseLong(kv[0]));
}
@Override
@@ -51,16 +51,16 @@
}
@Override
- protected Iterable<Edge<VLongWritable, FloatWritable>> getEdges(Text line) throws IOException {
- List<Edge<VLongWritable, FloatWritable>> edges = new ArrayList<Edge<VLongWritable, FloatWritable>>();
- Map<VLongWritable, FloatWritable> edgeMap = new HashMap<VLongWritable, FloatWritable>();
+ protected Iterable<Edge<LongWritable, DoubleWritable>> getEdges(Text line) throws IOException {
+ List<Edge<LongWritable, DoubleWritable>> edges = new ArrayList<Edge<LongWritable, DoubleWritable>>();
+ Map<LongWritable, DoubleWritable> edgeMap = new HashMap<LongWritable, DoubleWritable>();
for (int i = 1; i < items.length; i++) {
- edgeMap.put(new VLongWritable(Long.parseLong(items[i])), null);
+ edgeMap.put(new LongWritable(Long.parseLong(items[i])), null);
}
- for (Entry<VLongWritable, FloatWritable> entry : edgeMap.entrySet()) {
- MapMutableEdge<VLongWritable, FloatWritable> edge = new MapMutableEdge<VLongWritable, FloatWritable>();
+ for (Entry<LongWritable, DoubleWritable> entry : edgeMap.entrySet()) {
+ MapMutableEdge<LongWritable, DoubleWritable> edge = new MapMutableEdge<LongWritable, DoubleWritable>();
edge.setEntry(entry);
- edge.setValue(null);
+ edge.setValue(new DoubleWritable(1.0));
edges.add(edge);
}
return edges;
@@ -68,4 +68,5 @@
};
}
+
}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ConnectedComponentsVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ConnectedComponentsVertex.java
new file mode 100644
index 0000000..3789d6d
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ConnectedComponentsVertex.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.vertex;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+public class ConnectedComponentsVertex extends Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
+ /**
+ * Propagates the smallest vertex id to all neighbors. Will always choose to
+ * halt and only reactivate if a smaller id has been sent to it.
+ *
+ * @param messages
+ * Iterator of messages from the previous superstep.
+ * @throws IOException
+ */
+ @Override
+ public void compute(Iterable<LongWritable> messages) throws IOException {
+ long currentComponent = getValue().get();
+
+ // First superstep is special, because we can simply look at the neighbors
+ if (getSuperstep() == 0) {
+ for (Edge<LongWritable, NullWritable> edge : getEdges()) {
+ long neighbor = edge.getTargetVertexId().get();
+ if (neighbor < currentComponent) {
+ currentComponent = neighbor;
+ }
+ }
+ // Only need to send value if it is not the own id
+ if (currentComponent != getValue().get()) {
+ setValue(new LongWritable(currentComponent));
+ for (Edge<LongWritable, NullWritable> edge : getEdges()) {
+ LongWritable neighbor = edge.getTargetVertexId();
+ if (neighbor.get() > currentComponent) {
+ sendMessage(neighbor, getValue());
+ }
+ }
+ }
+
+ voteToHalt();
+ return;
+ }
+
+ boolean changed = false;
+ // did we get a smaller id ?
+ for (LongWritable message : messages) {
+ long candidateComponent = message.get();
+ if (candidateComponent < currentComponent) {
+ currentComponent = candidateComponent;
+ changed = true;
+ }
+ }
+
+ // propagate new component id to the neighbors
+ if (changed) {
+ setValue(new LongWritable(currentComponent));
+ sendMessageToAllEdges(getValue());
+ }
+ voteToHalt();
+ }
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/PageRankVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/PageRankVertex.java
new file mode 100644
index 0000000..86e90dd
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/PageRankVertex.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.vertex;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.examples.RandomWalkVertex;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * The PageRank algorithm, with uniform transition probabilities on the edges
+ * http://en.wikipedia.org/wiki/PageRank
+ */
+public class PageRankVertex extends RandomWalkVertex<NullWritable> {
+
+ @Override
+ protected double transitionProbability(double stateProbability, Edge<LongWritable, NullWritable> edge) {
+ return stateProbability / getNumEdges();
+ }
+
+ @Override
+ protected double recompute(Iterable<DoubleWritable> partialRanks, double teleportationProbability) {
+
+ // rank contribution from incident neighbors
+ double rankFromNeighbors = MathUtils.sum(partialRanks);
+ // rank contribution from dangling vertices
+ double danglingContribution = getDanglingProbability() / getTotalNumVertices();
+
+ // recompute rank
+ return (1d - teleportationProbability) * (rankFromNeighbors + danglingContribution) + teleportationProbability
+ / getTotalNumVertices();
+ }
+}
diff --git a/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ShortestPathsVertex.java b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ShortestPathsVertex.java
new file mode 100644
index 0000000..755a3d0
--- /dev/null
+++ b/pregelix/pregelix-benchmark/src/main/java/edu/uci/ics/pregelix/benchmark/vertex/ShortestPathsVertex.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.benchmark.vertex;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Shortest paths algorithm.
+ */
+public class ShortestPathsVertex extends Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+ /** Source id. */
+ public static final String SOURCE_ID = "giraph.shortestPathsBenchmark.sourceId";
+ /** Default source id. */
+ public static final long SOURCE_ID_DEFAULT = 1;
+
+ private boolean isSource() {
+ return getId().get() == getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
+
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() == 0) {
+ setValue(new DoubleWritable(Double.MAX_VALUE));
+ }
+
+ double minDist = isSource() ? 0d : Double.MAX_VALUE;
+ for (DoubleWritable message : messages) {
+ minDist = Math.min(minDist, message.get());
+ }
+
+ if (minDist < getValue().get()) {
+ setValue(new DoubleWritable(minDist));
+ for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
+ double distance = minDist + edge.getValue().get();
+ sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
+ }
+ }
+
+ voteToHalt();
+ }
+}