add the reachibility query example

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2079 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index c78327e..6ef7e13 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,6 +60,8 @@
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
     public static final String NUM_EDGES = "pregelix.numEdges";
+    /** job id */
+    public static final String JOB_ID = "pregelix.jobid";
 
     /**
      * Constructor that will instantiate the configuration
@@ -134,4 +136,14 @@
     final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
         getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
     }
+
+    /**
+     * Set the job Id
+     * 
+     * @param vertexCombinerClass
+     *            Determines how vertex messages are combined
+     */
+    final public void setJobId(String jobId) {
+        getConfiguration().set(JOB_ID, jobId);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d6bf24f..7c4853f 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -124,6 +124,10 @@
                 GlobalCountAggregator.class, GlobalAggregator.class);
     }
 
+    public static String getJobId(Configuration conf) {
+        return conf.get(PregelixJob.JOB_ID);
+    }
+
     /**
      * Create a user vertex combiner class
      * 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 63c1d8c..e8fdd68 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -127,7 +127,8 @@
                 end = System.currentTimeMillis();
                 time = end - start;
                 LOG.info("iteration " + i + " finished " + time + "ms");
-                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+                terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
+                        || IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
                 i++;
             } while (!terminate);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 1ff8917..de29dbc 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -106,6 +106,7 @@
         this.conf = job.getConfiguration();
         this.giraphJob = job;
         this.initJobConfiguration();
+        job.setJobId(jobId);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index f1dff72..dd562ba 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -91,6 +91,22 @@
         }
     }
 
+    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                FSDataOutputStream output = dfs.create(path, true);
+                output.writeBoolean(true);
+                output.flush();
+                output.close();
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
     public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
             throws HyracksDataException {
         try {
@@ -120,6 +136,21 @@
         }
     }
 
+    public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (dfs.exists(path)) {
+                return true;
+            } else {
+                return false;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
     public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index b51571c..775c3ed 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.Text;
@@ -37,7 +38,7 @@
 import edu.uci.ics.pregelix.example.io.VLongWritable;
 
 /**
- * Demonstrates the basic Pregel connected components implementation.
+ * Demonstrates the basic Pregel connected components implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
  */
 public class ConnectedComponentsVertex extends Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
     /**
@@ -91,6 +92,14 @@
     public void compute(Iterator<VLongWritable> msgIterator) {
         if (getSuperstep() == 1) {
             minID = getVertexId().get();
+            List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+            for (int i = 0; i < edges.size(); i++) {
+                Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+                long neighbor = edge.getDestVertexId().get();
+                if (minID > neighbor) {
+                    minID = neighbor;
+                }
+            }
             vertexValue.set(minID);
             setVertexValue(vertexValue);
             sendOutMsgs();
@@ -109,8 +118,10 @@
     }
 
     private void sendOutMsgs() {
-        for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
-            outputValue.set(minID);
+        List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+        outputValue.set(minID);
+        for (int i = 0; i < edges.size(); i++) {
+            Edge<VLongWritable, FloatWritable> edge = edges.get(i);
             sendMsg(edge.getDestVertexId(), outputValue);
         }
     }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
new file mode 100644
index 0000000..8c9c1ef
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel reachibility query implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
+ */
+public class ReachibilityVertex extends Vertex<VLongWritable, ByteWritable, FloatWritable, ByteWritable> {
+
+    public static class SimpleReachibilityCombiner extends MessageCombiner<VLongWritable, ByteWritable, ByteWritable> {
+        private ByteWritable agg = new ByteWritable();
+        private MsgList<ByteWritable> msgList;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void init(MsgList msgList) {
+            this.msgList = msgList;
+            agg.set((byte) 0);
+        }
+
+        @Override
+        public void step(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
+            int newState = agg.get() | msg.get();
+            agg.set((byte) newState);
+        }
+
+        @Override
+        public void step(ByteWritable partialAggregate) throws HyracksDataException {
+            int newState = agg.get() | partialAggregate.get();
+            agg.set((byte) newState);
+        }
+
+        @Override
+        public ByteWritable finishPartial() {
+            return agg;
+        }
+
+        @Override
+        public MsgList<ByteWritable> finishFinal() {
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
+    }
+
+    private ByteWritable vertexValue = new ByteWritable();
+
+    /** The source vertex id */
+    public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
+    /** The destination vertex id */
+    public static final String DEST_ID = "ReachibilityVertex.destId";
+    /** Default source vertex id */
+    public static final long SOURCE_ID_DEFAULT = 1;
+    /** Default destination vertex id */
+    public static final long DEST_ID_DEFAULT = 1;
+
+    /**
+     * Is this vertex the source id?
+     * 
+     * @return True if the source id
+     */
+    private boolean isSource(VLongWritable v) {
+        return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+    }
+
+    /**
+     * Is this vertex the dest id?
+     * 
+     * @return True if the source id
+     */
+    private boolean isDest(VLongWritable v) {
+        return (v.get() == getContext().getConfiguration().getLong(DEST_ID, DEST_ID_DEFAULT));
+    }
+
+    @Override
+    public void compute(Iterator<ByteWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            boolean isSource = isSource(getVertexId());
+            if (isSource) {
+                vertexValue.set((byte) 1);
+                setVertexValue(vertexValue);
+            }
+            boolean isDest = isDest(getVertexId());
+            if (isDest) {
+                vertexValue.set((byte) 2);
+                setVertexValue(vertexValue);
+            }
+            if (isSource && isDest) {
+                signalTerminate();
+                return;
+            }
+            if (isSource || isDest) {
+                sendOutMsgs();
+            } else {
+                vertexValue.set((byte) 0);
+                setVertexValue(vertexValue);
+            }
+        } else {
+            while (msgIterator.hasNext()) {
+                ByteWritable msg = msgIterator.next();
+                int msgValue = msg.get();
+                if (msgValue < 3) {
+                    int state = getVertexValue().get();
+                    int newState = state | msgValue;
+                    boolean changed = state == newState ? false : true;
+                    if (changed) {
+                        vertexValue.set((byte) newState);
+                        setVertexValue(vertexValue);
+                        if (newState < 3) {
+                            sendOutMsgs();
+                        } else {
+                            signalTerminate();
+                        }
+                    }
+                } else {
+                    signalTerminate();
+                }
+            }
+        }
+        voteToHalt();
+    }
+
+    private void signalTerminate() {
+        Configuration conf = getContext().getConfiguration();
+        try {
+            IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+            writeReachibilityResult(conf, true);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void sendOutMsgs() {
+        for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+            sendMsg(edge.getDestVertexId(), vertexValue);
+        }
+    }
+
+    private void writeReachibilityResult(Configuration conf, boolean terminate) {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                FSDataOutputStream output = dfs.create(path, true);
+                output.writeBoolean(terminate);
+                output.flush();
+                output.close();
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private static boolean readReachibilityResult(Configuration conf) {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                return false;
+            } else {
+                return true;
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(ReachibilityVertex.class.getSimpleName());
+        job.setVertexClass(ReachibilityVertex.class);
+        job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
+        job.setMessageCombinerClass(ReachibilityVertex.SimpleReachibilityCombiner.class);
+        Client.run(args, job);
+        System.out.println("reachable? " + readReachibilityResult(job.getConfiguration()));
+    }
+
+    /**
+     * Simple VertexWriter
+     */
+    public static class SimpleReachibilityVertexWriter extends
+            TextVertexWriter<VLongWritable, ByteWritable, FloatWritable> {
+        public SimpleReachibilityVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, ByteWritable, FloatWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    /**
+     * output format for reachibility
+     */
+    public static class SimpleReachibilityVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, ByteWritable, FloatWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, ByteWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleReachibilityVertexWriter(recordWriter);
+        }
+
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
index a1463e1..b02ce03 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -28,69 +28,70 @@
 import edu.uci.ics.pregelix.core.base.IDriver.Plan;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.ReachibilityVertex;
 import edu.uci.ics.pregelix.example.ShortestPathsVertex;
 
 public class Client {
 
-	private static class Options {
-		@Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
-		public String inputPaths;
+    private static class Options {
+        @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
+        public String inputPaths;
 
-		@Option(name = "-outputpath", usage = "output path", required = true)
-		public String outputPath;
+        @Option(name = "-outputpath", usage = "output path", required = true)
+        public String outputPath;
 
-		@Option(name = "-ip", usage = "ip address of cluster controller", required = true)
-		public String ipAddress;
+        @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
+        public String ipAddress;
 
-		@Option(name = "-port", usage = "port of cluster controller", required = true)
-		public int port;
+        @Option(name = "-port", usage = "port of cluster controller", required = true)
+        public int port;
 
-		@Option(name = "-plan", usage = "query plan choice", required = true)
-		public Plan planChoice;
+        @Option(name = "-plan", usage = "query plan choice", required = true)
+        public Plan planChoice;
 
-		@Option(name = "-vnum", usage = "number of vertices", required = false)
-		public long numVertices;
+        @Option(name = "-vnum", usage = "number of vertices", required = false)
+        public long numVertices;
 
-		@Option(name = "-enum", usage = "number of vertices", required = false)
-		public long numEdges;
+        @Option(name = "-enum", usage = "number of vertices", required = false)
+        public long numEdges;
 
-		@Option(name = "-source-vertex", usage = "source vertex id, for shortest paths only", required = false)
-		public long sourceId;
+        @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths/reachibility only", required = false)
+        public long sourceId;
 
-		@Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
-		public long numIteration = -1;
+        @Option(name = "-dest-vertex", usage = "dest vertex id, for reachibility only", required = false)
+        public long destId;
 
-		@Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
-		public String profiling = "false";
-	}
+        @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
+        public long numIteration = -1;
 
-	public static void run(String[] args, PregelixJob job) throws Exception {
-		Options options = prepareJob(args, job);
-		Driver driver = new Driver(Client.class);
-		driver.runJob(job, options.planChoice, options.ipAddress, options.port,
-				Boolean.parseBoolean(options.profiling));
-	}
+        @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
+        public String profiling = "false";
+    }
 
-	private static Options prepareJob(String[] args, PregelixJob job)
-			throws CmdLineException, IOException {
-		Options options = new Options();
-		CmdLineParser parser = new CmdLineParser(options);
-		parser.parseArgument(args);
+    public static void run(String[] args, PregelixJob job) throws Exception {
+        Options options = prepareJob(args, job);
+        Driver driver = new Driver(Client.class);
+        driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
+    }
 
-		String[] inputs = options.inputPaths.split(";");
-		FileInputFormat.setInputPaths(job, inputs[0]);
-		for (int i = 1; i < inputs.length; i++)
-			FileInputFormat.addInputPaths(job, inputs[0]);
-		FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
-		job.getConfiguration().setLong(PregelixJob.NUM_VERTICE,
-				options.numVertices);
-		job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
-		job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID,
-				options.sourceId);
-		if (options.numIteration > 0)
-			job.getConfiguration().setLong(PageRankVertex.ITERATIONS,
-					options.numIteration);
-		return options;
-	}
+    private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        String[] inputs = options.inputPaths.split(";");
+        FileInputFormat.setInputPaths(job, inputs[0]);
+        for (int i = 1; i < inputs.length; i++)
+            FileInputFormat.addInputPaths(job, inputs[0]);
+        FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
+        job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
+        job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
+        job.getConfiguration().setLong(ReachibilityVertex.SOURCE_ID, options.sourceId);
+        job.getConfiguration().setLong(ReachibilityVertex.DEST_ID, options.destId);
+        if (options.numIteration > 0)
+            job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
+        return options;
+    }
 
 }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
index e7eb933..a802403 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
@@ -38,7 +38,7 @@
     @Override
     public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
             InputSplit split, TaskAttemptContext context) throws IOException {
-        return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
+        return new TextReachibilityGraphReader(textInputFormat.createRecordReader(split, context));
     }
 }
 
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
new file mode 100644
index 0000000..9ef1d49
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextReachibilityVertexInputFormat extends
+        TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+    @Override
+    public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
+    }
+}
+
+@SuppressWarnings("rawtypes")
+class TextReachibilityGraphReader extends
+        TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+    private final static String separator = " ";
+    private Vertex vertex;
+    private VLongWritable vertexId = new VLongWritable();
+    private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+    private int used = 0;
+
+    public TextReachibilityGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+        super(lineRecordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> getCurrentVertex() throws IOException,
+            InterruptedException {
+        used = 0;
+        if (vertex == null)
+            vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+
+        vertex.reset();
+        Text line = getRecordReader().getCurrentValue();
+        String[] fields = line.toString().split(separator);
+
+        if (fields.length > 0) {
+            /**
+             * set the src vertex id
+             */
+            long src = Long.parseLong(fields[0]);
+            vertexId.set(src);
+            vertex.setVertexId(vertexId);
+            long dest = -1L;
+
+            /**
+             * set up edges
+             */
+            for (int i = 1; i < fields.length; i++) {
+                dest = Long.parseLong(fields[i]);
+                VLongWritable destId = allocate();
+                destId.set(dest);
+                vertex.addEdge(destId, null);
+            }
+        }
+        // vertex.sortEdges();
+        return vertex;
+    }
+
+    private VLongWritable allocate() {
+        if (used >= pool.size()) {
+            VLongWritable value = new VLongWritable();
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            VLongWritable value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 99fec55..37cc796 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -28,9 +28,12 @@
 import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexInputFormat;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.ReachibilityVertex;
+import edu.uci.ics.pregelix.example.ReachibilityVertex.SimpleReachibilityVertexOutputFormat;
 import edu.uci.ics.pregelix.example.ShortestPathsVertex;
 import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
 import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
 
 public class JobGenerator {
@@ -113,6 +116,20 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generateReachibilityRealComplex(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(ReachibilityVertex.class);
+        job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
+        job.setMessageCombinerClass(ReachibilityVertex.SimpleReachibilityCombiner.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+        job.getConfiguration().setLong(ReachibilityVertex.SOURCE_ID, 1);
+        job.getConfiguration().setLong(ReachibilityVertex.DEST_ID, 10);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
     private static void generatePageRankJob(String jobName, String outputPath) throws IOException {
         PregelixJob job = new PregelixJob(jobName);
         job.setVertexClass(PageRankVertex.class);
@@ -156,10 +173,15 @@
                 + "ConnectedComponentsRealComplex.xml");
     }
 
+    private static void genReachibility() throws IOException {
+        generateReachibilityRealComplex("Reachibility", outputBase + "ReachibilityRealComplex.xml");
+    }
+
     public static void main(String[] args) throws IOException {
         genPageRank();
         genShortestPath();
         genConnectedComponents();
+        genReachibility();
     }
 
 }
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
new file mode 100644
index 0000000..74113a8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
@@ -0,0 +1,23 @@
+0|Vertex(id=0,value=2, edges=(1,50,))
+1|Vertex(id=1,value=3, edges=(1,2,))
+2|Vertex(id=2,value=1, edges=(1,2,3,))
+3|Vertex(id=3,value=1, edges=(1,2,3,4,))
+4|Vertex(id=4,value=1, edges=(1,2,3,4,5,))
+5|Vertex(id=5,value=1, edges=(1,2,3,4,5,6,))
+6|Vertex(id=6,value=1, edges=(1,2,3,4,5,6,7,))
+7|Vertex(id=7,value=1, edges=(1,2,3,4,5,6,7,8,))
+8|Vertex(id=8,value=1, edges=(1,2,3,4,5,6,7,8,9,))
+9|Vertex(id=9,value=1, edges=(1,2,3,4,5,6,7,8,9,10,))
+10|Vertex(id=10,value=3, edges=(11,99,))
+11|Vertex(id=11,value=2, edges=(11,12,101,))
+12|Vertex(id=12,value=2, edges=(11,12,13,))
+13|Vertex(id=13,value=2, edges=(11,12,13,14,))
+14|Vertex(id=14,value=2, edges=(11,12,13,14,15,))
+15|Vertex(id=15,value=2, edges=(11,12,13,14,15,16,))
+16|Vertex(id=16,value=2, edges=(11,12,13,14,15,16,17,))
+17|Vertex(id=17,value=2, edges=(11,12,13,14,15,16,17,18,))
+18|Vertex(id=18,value=2, edges=(11,12,13,14,15,16,17,18,19,))
+19|Vertex(id=19,value=2, edges=(0,11,12,13,14,15,16,17,18,19,))
+21|Vertex(id=21,value=0, edges=(22,23,24,))
+25|Vertex(id=25,value=0, edges=())
+27|Vertex(id=27,value=0, edges=())
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
new file mode 100644
index 0000000..e59a6f1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/ReachibilityRealComplex.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmapcomplex</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultcomplex</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Reachibility</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>23</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.ReachibilityVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>ReachibilityVertex.destId</name><value>10</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.ReachibilityVertex$SimpleReachibilityCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.ReachibilityVertex$SimpleReachibilityVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>ReachibilityVertex.sourceId</name><value>1</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file