Merged fullstack_staging branch into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-example/src/main/assembly/binary-assembly.xml b/fullstack/pregelix/pregelix-example/src/main/assembly/binary-assembly.xml
new file mode 100755
index 0000000..0500499
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
new file mode 100644
index 0000000..30e88ea
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel connected components implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
+ */
+public class ConnectedComponentsVertex extends Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+ /**
+ * Test whether combiner is called to get the minimum ID in the cluster
+ */
+ public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, VLongWritable, VLongWritable> {
+ private long min = Long.MAX_VALUE;
+ private VLongWritable agg = new VLongWritable();
+ private MsgList<VLongWritable> msgList;
+
+ @Override
+ public void stepPartial(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
+ long value = msg.get();
+ if (min > value)
+ min = value;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void init(MsgList msgList) {
+ min = Long.MAX_VALUE;
+ this.msgList = msgList;
+ }
+
+ @Override
+ public void stepFinal(VLongWritable vertexIndex, VLongWritable partialAggregate) throws HyracksDataException {
+ if (min > partialAggregate.get())
+ min = partialAggregate.get();
+ }
+
+ @Override
+ public VLongWritable finishPartial() {
+ agg.set(min);
+ return agg;
+ }
+
+ @Override
+ public MsgList<VLongWritable> finishFinal() {
+ agg.set(min);
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
+ }
+
+ private VLongWritable outputValue = new VLongWritable();
+ private VLongWritable tmpVertexValue = new VLongWritable();
+ private long minID;
+
+ @Override
+ public void compute(Iterator<VLongWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ minID = getVertexId().get();
+ List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+ for (int i = 0; i < edges.size(); i++) {
+ Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+ long neighbor = edge.getDestVertexId().get();
+ if (minID > neighbor) {
+ minID = neighbor;
+ }
+ }
+ tmpVertexValue.set(minID);
+ setVertexValue(tmpVertexValue);
+ sendOutMsgs();
+ } else {
+ minID = getVertexId().get();
+ while (msgIterator.hasNext()) {
+ minID = Math.min(minID, msgIterator.next().get());
+ }
+ if (minID < getVertexValue().get()) {
+ tmpVertexValue.set(minID);
+ setVertexValue(tmpVertexValue);
+ sendOutMsgs();
+ }
+ }
+ voteToHalt();
+ }
+
+ private void sendOutMsgs() {
+ List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+ outputValue.set(minID);
+ for (int i = 0; i < edges.size(); i++) {
+ Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+ sendMsg(edge.getDestVertexId(), outputValue);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getSimpleName());
+ job.setVertexClass(ConnectedComponentsVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+ job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+ Client.run(args, job);
+ }
+
+ /**
+ * Simple VertexWriter that support
+ */
+ public static class SimpleConnectedComponentsVertexWriter extends
+ TextVertexWriter<VLongWritable, VLongWritable, FloatWritable> {
+ public SimpleConnectedComponentsVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, VLongWritable, FloatWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * output format for connected components
+ */
+ public static class SimpleConnectedComponentsVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, VLongWritable, FloatWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, VLongWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimpleConnectedComponentsVertexWriter(recordWriter);
+ }
+
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
new file mode 100644
index 0000000..290f90e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Maps;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
+ private DoubleWritable outputValue = new DoubleWritable();
+ private DoubleWritable tmpVertexValue = new DoubleWritable();
+
+ /**
+ * Test whether combiner is called by summing up the messages.
+ */
+ public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
+ private double sum = 0.0;
+ private DoubleWritable agg = new DoubleWritable();
+ private MsgList<DoubleWritable> msgList;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void init(MsgList msgList) {
+ sum = 0.0;
+ this.msgList = msgList;
+ }
+
+ @Override
+ public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+ sum += msg.get();
+ }
+
+ @Override
+ public DoubleWritable finishPartial() {
+ agg.set(sum);
+ return agg;
+ }
+
+ @Override
+ public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
+ sum += partialAggregate.get();
+ }
+
+ @Override
+ public MsgList<DoubleWritable> finishFinal() {
+ agg.set(sum);
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
+ }
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ int maxIteration = this.getContext().getConfiguration().getInt(ITERATIONS, 10);
+ if (getSuperstep() == 1) {
+ tmpVertexValue.set(1.0 / getNumVertices());
+ setVertexValue(tmpVertexValue);
+ }
+ if (getSuperstep() >= 2 && getSuperstep() <= maxIteration) {
+ double sum = 0;
+ while (msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ }
+ tmpVertexValue.set((0.15 / getNumVertices()) + 0.85 * sum);
+ setVertexValue(tmpVertexValue);
+ }
+
+ if (getSuperstep() >= 1 && getSuperstep() < maxIteration) {
+ long edges = getNumOutEdges();
+ outputValue.set(getVertexValue().get() / edges);
+ sendMsgToAllEdges(outputValue);
+ } else {
+ voteToHalt();
+ }
+ }
+
+ /**
+ * Simple VertexReader that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexReader extends
+ GeneratedVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(SimplePageRankVertexReader.class.getName());
+ private Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
+
+ public SimplePageRankVertexReader() {
+ super();
+ }
+
+ @Override
+ public boolean nextVertex() {
+ return totalRecords > recordsRead;
+ }
+
+ @Override
+ public Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> getCurrentVertex()
+ throws IOException {
+ Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex = BspUtils
+ .createVertex(configuration);
+
+ VLongWritable vertexId = new VLongWritable((inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+ long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ edges.put(new VLongWritable(destVertexId), new FloatWritable(edgeValue));
+ vertex.initialize(vertexId, vertexValue, edges, null);
+ ++recordsRead;
+ if (LOG.getLevel() == Level.FINE) {
+ LOG.fine("next: Return vertexId=" + vertex.getVertexId().get() + ", vertexValue="
+ + vertex.getVertexValue() + ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+ }
+ return vertex;
+ }
+ }
+
+ /**
+ * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexInputFormat extends
+ GeneratedVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ @Override
+ public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new SimplePageRankVertexReader();
+ }
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexWriter extends
+ TextVertexWriter<VLongWritable, DoubleWritable, FloatWritable> {
+ public SimplePageRankVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, DoubleWritable, FloatWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, DoubleWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimplePageRankVertexWriter(recordWriter);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(PageRankVertex.class.getSimpleName());
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ Client.run(args, job);
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
new file mode 100644
index 0000000..2f0ca45
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel reachibility query implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
+ */
+public class ReachabilityVertex extends Vertex<VLongWritable, ByteWritable, FloatWritable, ByteWritable> {
+
+ public static class SimpleReachibilityCombiner extends MessageCombiner<VLongWritable, ByteWritable, ByteWritable> {
+ private ByteWritable agg = new ByteWritable();
+ private MsgList<ByteWritable> msgList;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void init(MsgList msgList) {
+ this.msgList = msgList;
+ agg.set((byte) 0);
+ }
+
+ @Override
+ public void stepPartial(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
+ int newState = agg.get() | msg.get();
+ agg.set((byte) newState);
+ }
+
+ @Override
+ public void stepFinal(VLongWritable vertexIndex, ByteWritable partialAggregate) throws HyracksDataException {
+ int newState = agg.get() | partialAggregate.get();
+ agg.set((byte) newState);
+ }
+
+ @Override
+ public ByteWritable finishPartial() {
+ return agg;
+ }
+
+ @Override
+ public MsgList<ByteWritable> finishFinal() {
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
+ }
+
+ private ByteWritable tmpVertexValue = new ByteWritable();
+
+ /** The source vertex id */
+ public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
+ /** The destination vertex id */
+ public static final String DEST_ID = "ReachibilityVertex.destId";
+ /** Default source vertex id */
+ public static final long SOURCE_ID_DEFAULT = 1;
+ /** Default destination vertex id */
+ public static final long DEST_ID_DEFAULT = 1;
+
+ /**
+ * Is this vertex the source id?
+ *
+ * @return True if the source id
+ */
+ private boolean isSource(VLongWritable v) {
+ return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+ }
+
+ /**
+ * Is this vertex the dest id?
+ *
+ * @return True if the source id
+ */
+ private boolean isDest(VLongWritable v) {
+ return (v.get() == getContext().getConfiguration().getLong(DEST_ID, DEST_ID_DEFAULT));
+ }
+
+ @Override
+ public void compute(Iterator<ByteWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ boolean isSource = isSource(getVertexId());
+ if (isSource) {
+ tmpVertexValue.set((byte) 1);
+ setVertexValue(tmpVertexValue);
+ }
+ boolean isDest = isDest(getVertexId());
+ if (isDest) {
+ tmpVertexValue.set((byte) 2);
+ setVertexValue(tmpVertexValue);
+ }
+ if (isSource && isDest) {
+ signalTerminate();
+ return;
+ }
+ if (isSource || isDest) {
+ sendOutMsgs();
+ } else {
+ tmpVertexValue.set((byte) 0);
+ setVertexValue(tmpVertexValue);
+ }
+ } else {
+ while (msgIterator.hasNext()) {
+ ByteWritable msg = msgIterator.next();
+ int msgValue = msg.get();
+ if (msgValue < 3) {
+ int state = getVertexValue().get();
+ int newState = state | msgValue;
+ boolean changed = state == newState ? false : true;
+ if (changed) {
+ tmpVertexValue.set((byte) newState);
+ setVertexValue(tmpVertexValue);
+ if (newState < 3) {
+ sendOutMsgs();
+ } else {
+ signalTerminate();
+ }
+ }
+ } else {
+ signalTerminate();
+ }
+ }
+ }
+ voteToHalt();
+ }
+
+ private void signalTerminate() {
+ Configuration conf = getContext().getConfiguration();
+ try {
+ IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+ writeReachibilityResult(conf, true);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void sendOutMsgs() {
+ for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+ sendMsg(edge.getDestVertexId(), tmpVertexValue);
+ }
+ }
+
+ private void writeReachibilityResult(Configuration conf, boolean terminate) {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(terminate);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static boolean readReachibilityResult(Configuration conf) {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ return false;
+ } else {
+ return true;
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(ReachabilityVertex.class.getSimpleName());
+ job.setVertexClass(ReachabilityVertex.class);
+ job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
+ job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
+ Client.run(args, job);
+ System.out.println("reachable? " + readReachibilityResult(job.getConfiguration()));
+ }
+
+ /**
+ * Simple VertexWriter
+ */
+ public static class SimpleReachibilityVertexWriter extends
+ TextVertexWriter<VLongWritable, ByteWritable, FloatWritable> {
+ public SimpleReachibilityVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, ByteWritable, FloatWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * output format for reachibility
+ */
+ public static class SimpleReachibilityVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, ByteWritable, FloatWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, ByteWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimpleReachibilityVertexWriter(recordWriter);
+ }
+
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
new file mode 100644
index 0000000..a018f08
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel shortest paths implementation.
+ */
+public class ShortestPathsVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /**
+ * Test whether combiner is called by summing up the messages.
+ */
+ public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
+ private double min = Double.MAX_VALUE;
+ private DoubleWritable agg = new DoubleWritable();
+ private MsgList<DoubleWritable> msgList;
+
+ @Override
+ public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+ double value = msg.get();
+ if (min > value)
+ min = value;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void init(MsgList msgList) {
+ min = Double.MAX_VALUE;
+ this.msgList = msgList;
+ }
+
+ @Override
+ public DoubleWritable finishPartial() {
+ agg.set(min);
+ return agg;
+ }
+
+ @Override
+ public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
+ double value = partialAggregate.get();
+ if (min > value)
+ min = value;
+ }
+
+ @Override
+ public MsgList<DoubleWritable> finishFinal() {
+ agg.set(min);
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
+ }
+
+ private DoubleWritable outputValue = new DoubleWritable();
+ private DoubleWritable tmpVertexValue = new DoubleWritable();
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(ShortestPathsVertex.class.getName());
+ /** The shortest paths id */
+ public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
+ /** Default shortest paths id */
+ public static final long SOURCE_ID_DEFAULT = 1;
+
+ /**
+ * Is this vertex the source id?
+ *
+ * @return True if the source id
+ */
+ private boolean isSource() {
+ return (getVertexId().get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+ }
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ tmpVertexValue.set(Double.MAX_VALUE);
+ setVertexValue(tmpVertexValue);
+ }
+ double minDist = isSource() ? 0d : Double.MAX_VALUE;
+ while (msgIterator.hasNext()) {
+ minDist = Math.min(minDist, msgIterator.next().get());
+ }
+ if (LOG.getLevel() == Level.FINE) {
+ LOG.fine("Vertex " + getVertexId() + " got minDist = " + minDist + " vertex value = " + getVertexValue());
+ }
+ if (minDist < getVertexValue().get()) {
+ tmpVertexValue.set(minDist);
+ setVertexValue(tmpVertexValue);
+ for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+ if (LOG.getLevel() == Level.FINE) {
+ LOG.fine("Vertex " + getVertexId() + " sent to " + edge.getDestVertexId() + " = "
+ + (minDist + edge.getEdgeValue().get()));
+ }
+ outputValue.set(minDist + edge.getEdgeValue().get());
+ sendMsg(edge.getDestVertexId(), outputValue);
+ }
+ }
+ voteToHalt();
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(ShortestPathsVertex.class.getSimpleName());
+ job.setVertexClass(ShortestPathsVertex.class);
+ job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+ job.getConfiguration().setLong(SOURCE_ID, 0);
+ Client.run(args, job);
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java
new file mode 100644
index 0000000..68b7cca
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class VertexAggregator {
+
+ public static class MapRecordOnly extends MapReduceBase implements
+ Mapper<LongWritable, Text, NullWritable, LongWritable> {
+ private final NullWritable nullValue = NullWritable.get();
+ private final LongWritable count = new LongWritable(1);
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<NullWritable, LongWritable> output,
+ Reporter reporter) throws IOException {
+ output.collect(nullValue, count);
+ }
+ }
+
+ public static class CombineRecordOnly extends MapReduceBase implements
+ Reducer<NullWritable, LongWritable, NullWritable, LongWritable> {
+ private final NullWritable nullValue = NullWritable.get();
+
+ public void reduce(NullWritable inputKey, Iterator<LongWritable> inputValue,
+ OutputCollector<NullWritable, LongWritable> output, Reporter reporter) throws IOException {
+ long count = 0;
+ while (inputValue.hasNext())
+ count += inputValue.next().get();
+ output.collect(nullValue, new LongWritable(count));
+ }
+ }
+
+ public static class ReduceRecordOnly extends MapReduceBase implements
+ Reducer<NullWritable, LongWritable, NullWritable, Text> {
+ private final NullWritable nullValue = NullWritable.get();
+
+ public void reduce(NullWritable inputKey, Iterator<LongWritable> inputValue,
+ OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+ long count = 0;
+ while (inputValue.hasNext())
+ count += inputValue.next().get();
+ output.collect(nullValue, new Text(Long.toString(count)));
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(VertexAggregator.class);
+
+ job.setJobName(VertexAggregator.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setCombinerClass(CombineRecordOnly.class);
+ job.setReducerClass(ReduceRecordOnly.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(LongWritable.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setNumReduceTasks(Integer.parseInt(args[2]));
+ JobClient.runJob(job);
+ }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java
new file mode 100644
index 0000000..1dd6922
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class VertexSorter {
+ public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+ private static String separator = " ";
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+ throws IOException {
+ String[] fields = inputValue.toString().split(separator);
+ LongWritable vertexId = new LongWritable(Long.parseLong(fields[0]));
+ output.collect(vertexId, inputValue);
+ }
+ }
+
+ public static class ReduceRecordOnly extends MapReduceBase implements
+ Reducer<LongWritable, Text, NullWritable, Text> {
+
+ NullWritable key = NullWritable.get();
+
+ public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+ OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+ while (inputValue.hasNext())
+ output.collect(key, inputValue.next());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(VertexSorter.class);
+
+ job.setJobName(VertexSorter.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setReducerClass(ReduceRecordOnly.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setNumReduceTasks(Integer.parseInt(args[2]));
+ JobClient.runJob(job);
+ }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
new file mode 100644
index 0000000..597ad6e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.ReachabilityVertex;
+import edu.uci.ics.pregelix.example.ShortestPathsVertex;
+
+public class Client {
+
+ private static class Options {
+ @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
+ public String inputPaths;
+
+ @Option(name = "-outputpath", usage = "output path", required = true)
+ public String outputPath;
+
+ @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
+ public String ipAddress;
+
+ @Option(name = "-port", usage = "port of cluster controller", required = false)
+ public int port;
+
+ @Option(name = "-plan", usage = "query plan choice", required = false)
+ public Plan planChoice = Plan.OUTER_JOIN;
+
+ @Option(name = "-vnum", usage = "number of vertices", required = false)
+ public long numVertices;
+
+ @Option(name = "-enum", usage = "number of vertices", required = false)
+ public long numEdges;
+
+ @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths/reachibility only", required = false)
+ public long sourceId;
+
+ @Option(name = "-dest-vertex", usage = "dest vertex id, for reachibility only", required = false)
+ public long destId;
+
+ @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
+ public long numIteration = -1;
+
+ @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
+ public String profiling = "false";
+ }
+
+ public static void run(String[] args, PregelixJob job) throws Exception {
+ Options options = prepareJob(args, job);
+ Driver driver = new Driver(Client.class);
+ driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
+ }
+
+ private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ String[] inputs = options.inputPaths.split(";");
+ FileInputFormat.setInputPaths(job, inputs[0]);
+ for (int i = 1; i < inputs.length; i++)
+ FileInputFormat.addInputPaths(job, inputs[0]);
+ FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
+ job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
+ job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
+ job.getConfiguration().setLong(ReachabilityVertex.SOURCE_ID, options.sourceId);
+ job.getConfiguration().setLong(ReachabilityVertex.DEST_ID, options.destId);
+ if (options.numIteration > 0)
+ job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
+ return options;
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
new file mode 100644
index 0000000..a802403
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextConnectedComponentsInputFormat extends
+ TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextReachibilityGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextConnectedComponentsGraphReader extends
+ TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextConnectedComponentsGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ // vertex.sortEdges();
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
new file mode 100644
index 0000000..a8a752e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextPageRankInputFormat extends
+ TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextPageRankGraphReader extends TextVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ // vertex.sortEdges();
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
new file mode 100644
index 0000000..9ef1d49
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextReachibilityVertexInputFormat extends
+ TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextReachibilityGraphReader extends
+ TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextReachibilityGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ // vertex.sortEdges();
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
new file mode 100644
index 0000000..d445935
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextShortestPathsInputFormat extends
+ TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextShortestPathsGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextShortestPathsGraphReader extends
+ TextVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private FloatWritable initValue = new FloatWritable(1.0f);
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextShortestPathsGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, initValue);
+ }
+ }
+ // vertex.sortEdges();
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
new file mode 100644
index 0000000..a6c2c1e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * A WritableComparable for longs in a variable-length format. Such values take
+ * between one and five bytes. Smaller values take fewer bytes.
+ *
+ * @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
+ */
+@SuppressWarnings("rawtypes")
+public class VLongWritable implements WritableComparable {
+ private long value;
+
+ public VLongWritable() {
+ }
+
+ public VLongWritable(long value) {
+ set(value);
+ }
+
+ /** Set the value of this LongWritable. */
+ public void set(long value) {
+ this.value = value;
+ }
+
+ /** Return the value of this LongWritable. */
+ public long get() {
+ return value;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ value = SerDeUtils.readVLong(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ SerDeUtils.writeVLong(out, value);
+ }
+
+ /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
+ public boolean equals(Object o) {
+ if (!(o instanceof VLongWritable))
+ return false;
+ VLongWritable other = (VLongWritable) o;
+ return this.value == other.value;
+ }
+
+ public int hashCode() {
+ return (int) value;
+ }
+
+ /** Compares two VLongWritables. */
+ public int compareTo(Object o) {
+ long thisValue = this.value;
+ long thatValue = ((VLongWritable) o).value;
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ }
+
+ public String toString() {
+ return Long.toString(value);
+ }
+
+ /** A Comparator optimized for LongWritable. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(VLongWritable.class);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ long thisValue = SerDeUtils.readVLong(b1, s1, l1);
+ long thatValue = SerDeUtils.readVLong(b2, s2, l2);
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ }
+ }
+
+ /** A decreasing Comparator optimized for LongWritable. */
+ public static class DecreasingComparator extends Comparator {
+ public int compare(WritableComparable a, WritableComparable b) {
+ return -super.compare(a, b);
+ }
+
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return -super.compare(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ static { // register default comparator
+ WritableComparator.define(VLongWritable.class, new Comparator());
+ }
+
+}