add the reachibility query example
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2079 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index c78327e..6ef7e13 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,6 +60,8 @@
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
public static final String NUM_EDGES = "pregelix.numEdges";
+ /** job id */
+ public static final String JOB_ID = "pregelix.jobid";
/**
* Constructor that will instantiate the configuration
@@ -134,4 +136,14 @@
final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
}
+
+ /**
+ * Set the job Id
+ *
+ * @param vertexCombinerClass
+ * Determines how vertex messages are combined
+ */
+ final public void setJobId(String jobId) {
+ getConfiguration().set(JOB_ID, jobId);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d6bf24f..7c4853f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -124,6 +124,10 @@
GlobalCountAggregator.class, GlobalAggregator.class);
}
+ public static String getJobId(Configuration conf) {
+ return conf.get(PregelixJob.JOB_ID);
+ }
+
/**
* Create a user vertex combiner class
*
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 63c1d8c..e8fdd68 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -127,7 +127,8 @@
end = System.currentTimeMillis();
time = end - start;
LOG.info("iteration " + i + " finished " + time + "ms");
- terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+ terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
+ || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
i++;
} while (!terminate);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 1ff8917..de29dbc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -106,6 +106,7 @@
this.conf = job.getConfiguration();
this.giraphJob = job;
this.initJobConfiguration();
+ job.setJobId(jobId);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index f1dff72..dd562ba 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -91,6 +91,22 @@
}
}
+ public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(true);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
throws HyracksDataException {
try {
@@ -120,6 +136,21 @@
}
}
+ public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
+ Path path = new Path(pathStr);
+ if (dfs.exists(path)) {
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
try {
FileSystem dfs = FileSystem.get(conf);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index b51571c..775c3ed 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
@@ -37,7 +38,7 @@
import edu.uci.ics.pregelix.example.io.VLongWritable;
/**
- * Demonstrates the basic Pregel connected components implementation.
+ * Demonstrates the basic Pregel connected components implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
*/
public class ConnectedComponentsVertex extends Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
/**
@@ -91,6 +92,14 @@
public void compute(Iterator<VLongWritable> msgIterator) {
if (getSuperstep() == 1) {
minID = getVertexId().get();
+ List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+ for (int i = 0; i < edges.size(); i++) {
+ Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+ long neighbor = edge.getDestVertexId().get();
+ if (minID > neighbor) {
+ minID = neighbor;
+ }
+ }
vertexValue.set(minID);
setVertexValue(vertexValue);
sendOutMsgs();
@@ -109,8 +118,10 @@
}
private void sendOutMsgs() {
- for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
- outputValue.set(minID);
+ List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+ outputValue.set(minID);
+ for (int i = 0; i < edges.size(); i++) {
+ Edge<VLongWritable, FloatWritable> edge = edges.get(i);
sendMsg(edge.getDestVertexId(), outputValue);
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
new file mode 100644
index 0000000..8c9c1ef
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel reachibility query implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
+ */
+public class ReachibilityVertex extends Vertex<VLongWritable, ByteWritable, FloatWritable, ByteWritable> {
+
+ public static class SimpleReachibilityCombiner extends MessageCombiner<VLongWritable, ByteWritable, ByteWritable> {
+ private ByteWritable agg = new ByteWritable();
+ private MsgList<ByteWritable> msgList;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void init(MsgList msgList) {
+ this.msgList = msgList;
+ agg.set((byte) 0);
+ }
+
+ @Override
+ public void step(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
+ int newState = agg.get() | msg.get();
+ agg.set((byte) newState);
+ }
+
+ @Override
+ public void step(ByteWritable partialAggregate) throws HyracksDataException {
+ int newState = agg.get() | partialAggregate.get();
+ agg.set((byte) newState);
+ }
+
+ @Override
+ public ByteWritable finishPartial() {
+ return agg;
+ }
+
+ @Override
+ public MsgList<ByteWritable> finishFinal() {
+ msgList.clear();
+ msgList.add(agg);
+ return msgList;
+ }
+ }
+
+ private ByteWritable vertexValue = new ByteWritable();
+
+ /** The source vertex id */
+ public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
+ /** The destination vertex id */
+ public static final String DEST_ID = "ReachibilityVertex.destId";
+ /** Default source vertex id */
+ public static final long SOURCE_ID_DEFAULT = 1;
+ /** Default destination vertex id */
+ public static final long DEST_ID_DEFAULT = 1;
+
+ /**
+ * Is this vertex the source id?
+ *
+ * @return True if the source id
+ */
+ private boolean isSource(VLongWritable v) {
+ return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+ }
+
+ /**
+ * Is this vertex the dest id?
+ *
+ * @return True if the source id
+ */
+ private boolean isDest(VLongWritable v) {
+ return (v.get() == getContext().getConfiguration().getLong(DEST_ID, DEST_ID_DEFAULT));
+ }
+
+ @Override
+ public void compute(Iterator<ByteWritable> msgIterator) {
+ if (getSuperstep() == 1) {
+ boolean isSource = isSource(getVertexId());
+ if (isSource) {
+ vertexValue.set((byte) 1);
+ setVertexValue(vertexValue);
+ }
+ boolean isDest = isDest(getVertexId());
+ if (isDest) {
+ vertexValue.set((byte) 2);
+ setVertexValue(vertexValue);
+ }
+ if (isSource && isDest) {
+ signalTerminate();
+ return;
+ }
+ if (isSource || isDest) {
+ sendOutMsgs();
+ } else {
+ vertexValue.set((byte) 0);
+ setVertexValue(vertexValue);
+ }
+ } else {
+ while (msgIterator.hasNext()) {
+ ByteWritable msg = msgIterator.next();
+ int msgValue = msg.get();
+ if (msgValue < 3) {
+ int state = getVertexValue().get();
+ int newState = state | msgValue;
+ boolean changed = state == newState ? false : true;
+ if (changed) {
+ vertexValue.set((byte) newState);
+ setVertexValue(vertexValue);
+ if (newState < 3) {
+ sendOutMsgs();
+ } else {
+ signalTerminate();
+ }
+ }
+ } else {
+ signalTerminate();
+ }
+ }
+ }
+ voteToHalt();
+ }
+
+ private void signalTerminate() {
+ Configuration conf = getContext().getConfiguration();
+ try {
+ IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+ writeReachibilityResult(conf, true);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void sendOutMsgs() {
+ for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+ sendMsg(edge.getDestVertexId(), vertexValue);
+ }
+ }
+
+ private void writeReachibilityResult(Configuration conf, boolean terminate) {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ FSDataOutputStream output = dfs.create(path, true);
+ output.writeBoolean(terminate);
+ output.flush();
+ output.close();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static boolean readReachibilityResult(Configuration conf) {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+ Path path = new Path(pathStr);
+ if (!dfs.exists(path)) {
+ return false;
+ } else {
+ return true;
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ PregelixJob job = new PregelixJob(ReachibilityVertex.class.getSimpleName());
+ job.setVertexClass(ReachibilityVertex.class);
+ job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
+ job.setMessageCombinerClass(ReachibilityVertex.SimpleReachibilityCombiner.class);
+ Client.run(args, job);
+ System.out.println("reachable? " + readReachibilityResult(job.getConfiguration()));
+ }
+
+ /**
+ * Simple VertexWriter
+ */
+ public static class SimpleReachibilityVertexWriter extends
+ TextVertexWriter<VLongWritable, ByteWritable, FloatWritable> {
+ public SimpleReachibilityVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
+
+ @Override
+ public void writeVertex(Vertex<VLongWritable, ByteWritable, FloatWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * output format for reachibility
+ */
+ public static class SimpleReachibilityVertexOutputFormat extends
+ TextVertexOutputFormat<VLongWritable, ByteWritable, FloatWritable> {
+
+ @Override
+ public VertexWriter<VLongWritable, ByteWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+ return new SimpleReachibilityVertexWriter(recordWriter);
+ }
+
+ }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index a1463e1..b02ce03 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -28,69 +28,70 @@
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.ReachibilityVertex;
import edu.uci.ics.pregelix.example.ShortestPathsVertex;
public class Client {
- private static class Options {
- @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
- public String inputPaths;
+ private static class Options {
+ @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
+ public String inputPaths;
- @Option(name = "-outputpath", usage = "output path", required = true)
- public String outputPath;
+ @Option(name = "-outputpath", usage = "output path", required = true)
+ public String outputPath;
- @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
- public String ipAddress;
+ @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
+ public String ipAddress;
- @Option(name = "-port", usage = "port of cluster controller", required = true)
- public int port;
+ @Option(name = "-port", usage = "port of cluster controller", required = true)
+ public int port;
- @Option(name = "-plan", usage = "query plan choice", required = true)
- public Plan planChoice;
+ @Option(name = "-plan", usage = "query plan choice", required = true)
+ public Plan planChoice;
- @Option(name = "-vnum", usage = "number of vertices", required = false)
- public long numVertices;
+ @Option(name = "-vnum", usage = "number of vertices", required = false)
+ public long numVertices;
- @Option(name = "-enum", usage = "number of vertices", required = false)
- public long numEdges;
+ @Option(name = "-enum", usage = "number of vertices", required = false)
+ public long numEdges;
- @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths only", required = false)
- public long sourceId;
+ @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths/reachibility only", required = false)
+ public long sourceId;
- @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
- public long numIteration = -1;
+ @Option(name = "-dest-vertex", usage = "dest vertex id, for reachibility only", required = false)
+ public long destId;
- @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
- public String profiling = "false";
- }
+ @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
+ public long numIteration = -1;
- public static void run(String[] args, PregelixJob job) throws Exception {
- Options options = prepareJob(args, job);
- Driver driver = new Driver(Client.class);
- driver.runJob(job, options.planChoice, options.ipAddress, options.port,
- Boolean.parseBoolean(options.profiling));
- }
+ @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
+ public String profiling = "false";
+ }
- private static Options prepareJob(String[] args, PregelixJob job)
- throws CmdLineException, IOException {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
+ public static void run(String[] args, PregelixJob job) throws Exception {
+ Options options = prepareJob(args, job);
+ Driver driver = new Driver(Client.class);
+ driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
+ }
- String[] inputs = options.inputPaths.split(";");
- FileInputFormat.setInputPaths(job, inputs[0]);
- for (int i = 1; i < inputs.length; i++)
- FileInputFormat.addInputPaths(job, inputs[0]);
- FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
- job.getConfiguration().setLong(PregelixJob.NUM_VERTICE,
- options.numVertices);
- job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
- job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID,
- options.sourceId);
- if (options.numIteration > 0)
- job.getConfiguration().setLong(PageRankVertex.ITERATIONS,
- options.numIteration);
- return options;
- }
+ private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ String[] inputs = options.inputPaths.split(";");
+ FileInputFormat.setInputPaths(job, inputs[0]);
+ for (int i = 1; i < inputs.length; i++)
+ FileInputFormat.addInputPaths(job, inputs[0]);
+ FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
+ job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
+ job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
+ job.getConfiguration().setLong(ReachibilityVertex.SOURCE_ID, options.sourceId);
+ job.getConfiguration().setLong(ReachibilityVertex.DEST_ID, options.destId);
+ if (options.numIteration > 0)
+ job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
+ return options;
+ }
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
index e7eb933..a802403 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
@@ -38,7 +38,7 @@
@Override
public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
- return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
+ return new TextReachibilityGraphReader(textInputFormat.createRecordReader(split, context));
}
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
new file mode 100644
index 0000000..9ef1d49
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextReachibilityVertexInputFormat extends
+ TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ @Override
+ public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
+ }
+}
+
+@SuppressWarnings("rawtypes")
+class TextReachibilityGraphReader extends
+ TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+ private final static String separator = " ";
+ private Vertex vertex;
+ private VLongWritable vertexId = new VLongWritable();
+ private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+ private int used = 0;
+
+ public TextReachibilityGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> getCurrentVertex() throws IOException,
+ InterruptedException {
+ used = 0;
+ if (vertex == null)
+ vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+ vertex.getMsgList().clear();
+ vertex.getEdges().clear();
+
+ vertex.reset();
+ Text line = getRecordReader().getCurrentValue();
+ String[] fields = line.toString().split(separator);
+
+ if (fields.length > 0) {
+ /**
+ * set the src vertex id
+ */
+ long src = Long.parseLong(fields[0]);
+ vertexId.set(src);
+ vertex.setVertexId(vertexId);
+ long dest = -1L;
+
+ /**
+ * set up edges
+ */
+ for (int i = 1; i < fields.length; i++) {
+ dest = Long.parseLong(fields[i]);
+ VLongWritable destId = allocate();
+ destId.set(dest);
+ vertex.addEdge(destId, null);
+ }
+ }
+ // vertex.sortEdges();
+ return vertex;
+ }
+
+ private VLongWritable allocate() {
+ if (used >= pool.size()) {
+ VLongWritable value = new VLongWritable();
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ VLongWritable value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 99fec55..37cc796 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -28,9 +28,12 @@
import edu.uci.ics.pregelix.example.PageRankVertex;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexInputFormat;
import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.ReachibilityVertex;
+import edu.uci.ics.pregelix.example.ReachibilityVertex.SimpleReachibilityVertexOutputFormat;
import edu.uci.ics.pregelix.example.ShortestPathsVertex;
import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
public class JobGenerator {
@@ -113,6 +116,20 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
+ private static void generateReachibilityRealComplex(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(ReachibilityVertex.class);
+ job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
+ job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
+ job.setMessageCombinerClass(ReachibilityVertex.SimpleReachibilityCombiner.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+ job.getConfiguration().setLong(ReachibilityVertex.SOURCE_ID, 1);
+ job.getConfiguration().setLong(ReachibilityVertex.DEST_ID, 10);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
private static void generatePageRankJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(PageRankVertex.class);
@@ -156,10 +173,15 @@
+ "ConnectedComponentsRealComplex.xml");
}
+ private static void genReachibility() throws IOException {
+ generateReachibilityRealComplex("Reachibility", outputBase + "ReachibilityRealComplex.xml");
+ }
+
public static void main(String[] args) throws IOException {
genPageRank();
genShortestPath();
genConnectedComponents();
+ genReachibility();
}
}
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
new file mode 100644
index 0000000..74113a8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
@@ -0,0 +1,23 @@
+0|Vertex(id=0,value=2, edges=(1,50,))
+1|Vertex(id=1,value=3, edges=(1,2,))
+2|Vertex(id=2,value=1, edges=(1,2,3,))
+3|Vertex(id=3,value=1, edges=(1,2,3,4,))
+4|Vertex(id=4,value=1, edges=(1,2,3,4,5,))
+5|Vertex(id=5,value=1, edges=(1,2,3,4,5,6,))
+6|Vertex(id=6,value=1, edges=(1,2,3,4,5,6,7,))
+7|Vertex(id=7,value=1, edges=(1,2,3,4,5,6,7,8,))
+8|Vertex(id=8,value=1, edges=(1,2,3,4,5,6,7,8,9,))
+9|Vertex(id=9,value=1, edges=(1,2,3,4,5,6,7,8,9,10,))
+10|Vertex(id=10,value=3, edges=(11,99,))
+11|Vertex(id=11,value=2, edges=(11,12,101,))
+12|Vertex(id=12,value=2, edges=(11,12,13,))
+13|Vertex(id=13,value=2, edges=(11,12,13,14,))
+14|Vertex(id=14,value=2, edges=(11,12,13,14,15,))
+15|Vertex(id=15,value=2, edges=(11,12,13,14,15,16,))
+16|Vertex(id=16,value=2, edges=(11,12,13,14,15,16,17,))
+17|Vertex(id=17,value=2, edges=(11,12,13,14,15,16,17,18,))
+18|Vertex(id=18,value=2, edges=(11,12,13,14,15,16,17,18,19,))
+19|Vertex(id=19,value=2, edges=(0,11,12,13,14,15,16,17,18,19,))
+21|Vertex(id=21,value=0, edges=(22,23,24,))
+25|Vertex(id=25,value=0, edges=())
+27|Vertex(id=27,value=0, edges=())
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
new file mode 100644
index 0000000..e59a6f1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachibilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachibilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachibilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file