Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-example/src/main/assembly/binary-assembly.xml b/fullstack/pregelix/pregelix-example/src/main/assembly/binary-assembly.xml
new file mode 100755
index 0000000..0500499
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
new file mode 100644
index 0000000..30e88ea
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel connected components implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
+ */
+public class ConnectedComponentsVertex extends Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+    /**
+     * Test whether combiner is called to get the minimum ID in the cluster
+     */
+    public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, VLongWritable, VLongWritable> {
+        private long min = Long.MAX_VALUE;
+        private VLongWritable agg = new VLongWritable();
+        private MsgList<VLongWritable> msgList;
+
+        @Override
+        public void stepPartial(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
+            long value = msg.get();
+            if (min > value)
+                min = value;
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void init(MsgList msgList) {
+            min = Long.MAX_VALUE;
+            this.msgList = msgList;
+        }
+
+        @Override
+        public void stepFinal(VLongWritable vertexIndex, VLongWritable partialAggregate) throws HyracksDataException {
+            if (min > partialAggregate.get())
+                min = partialAggregate.get();
+        }
+
+        @Override
+        public VLongWritable finishPartial() {
+            agg.set(min);
+            return agg;
+        }
+
+        @Override
+        public MsgList<VLongWritable> finishFinal() {
+            agg.set(min);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
+    }
+
+    private VLongWritable outputValue = new VLongWritable();
+    private VLongWritable tmpVertexValue = new VLongWritable();
+    private long minID;
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            minID = getVertexId().get();
+            List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+            for (int i = 0; i < edges.size(); i++) {
+                Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+                long neighbor = edge.getDestVertexId().get();
+                if (minID > neighbor) {
+                    minID = neighbor;
+                }
+            }
+            tmpVertexValue.set(minID);
+            setVertexValue(tmpVertexValue);
+            sendOutMsgs();
+        } else {
+            minID = getVertexId().get();
+            while (msgIterator.hasNext()) {
+                minID = Math.min(minID, msgIterator.next().get());
+            }
+            if (minID < getVertexValue().get()) {
+                tmpVertexValue.set(minID);
+                setVertexValue(tmpVertexValue);
+                sendOutMsgs();
+            }
+        }
+        voteToHalt();
+    }
+
+    private void sendOutMsgs() {
+        List<Edge<VLongWritable, FloatWritable>> edges = this.getEdges();
+        outputValue.set(minID);
+        for (int i = 0; i < edges.size(); i++) {
+            Edge<VLongWritable, FloatWritable> edge = edges.get(i);
+            sendMsg(edge.getDestVertexId(), outputValue);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getSimpleName());
+        job.setVertexClass(ConnectedComponentsVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+        job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+        Client.run(args, job);
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleConnectedComponentsVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, FloatWritable> {
+        public SimpleConnectedComponentsVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, FloatWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    /**
+     * output format for connected components
+     */
+    public static class SimpleConnectedComponentsVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, FloatWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleConnectedComponentsVertexWriter(recordWriter);
+        }
+
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
new file mode 100644
index 0000000..290f90e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Maps;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.generated.GeneratedVertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class PageRankVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+    public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
+    private DoubleWritable outputValue = new DoubleWritable();
+    private DoubleWritable tmpVertexValue = new DoubleWritable();
+
+    /**
+     * Test whether combiner is called by summing up the messages.
+     */
+    public static class SimpleSumCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
+        private double sum = 0.0;
+        private DoubleWritable agg = new DoubleWritable();
+        private MsgList<DoubleWritable> msgList;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void init(MsgList msgList) {
+            sum = 0.0;
+            this.msgList = msgList;
+        }
+
+        @Override
+        public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+            sum += msg.get();
+        }
+
+        @Override
+        public DoubleWritable finishPartial() {
+            agg.set(sum);
+            return agg;
+        }
+
+        @Override
+        public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
+            sum += partialAggregate.get();
+        }
+
+        @Override
+        public MsgList<DoubleWritable> finishFinal() {
+            agg.set(sum);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
+    }
+
+    @Override
+    public void compute(Iterator<DoubleWritable> msgIterator) {
+        int maxIteration = this.getContext().getConfiguration().getInt(ITERATIONS, 10);
+        if (getSuperstep() == 1) {
+            tmpVertexValue.set(1.0 / getNumVertices());
+            setVertexValue(tmpVertexValue);
+        }
+        if (getSuperstep() >= 2 && getSuperstep() <= maxIteration) {
+            double sum = 0;
+            while (msgIterator.hasNext()) {
+                sum += msgIterator.next().get();
+            }
+            tmpVertexValue.set((0.15 / getNumVertices()) + 0.85 * sum);
+            setVertexValue(tmpVertexValue);
+        }
+
+        if (getSuperstep() >= 1 && getSuperstep() < maxIteration) {
+            long edges = getNumOutEdges();
+            outputValue.set(getVertexValue().get() / edges);
+            sendMsgToAllEdges(outputValue);
+        } else {
+            voteToHalt();
+        }
+    }
+
+    /**
+     * Simple VertexReader that supports {@link SimplePageRankVertex}
+     */
+    public static class SimplePageRankVertexReader extends
+            GeneratedVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+        /** Class logger */
+        private static final Logger LOG = Logger.getLogger(SimplePageRankVertexReader.class.getName());
+        private Map<VLongWritable, FloatWritable> edges = Maps.newHashMap();
+
+        public SimplePageRankVertexReader() {
+            super();
+        }
+
+        @Override
+        public boolean nextVertex() {
+            return totalRecords > recordsRead;
+        }
+
+        @Override
+        public Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> getCurrentVertex()
+                throws IOException {
+            Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex = BspUtils
+                    .createVertex(configuration);
+
+            VLongWritable vertexId = new VLongWritable((inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+            DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+            long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
+            float edgeValue = vertexId.get() * 100f;
+            edges.put(new VLongWritable(destVertexId), new FloatWritable(edgeValue));
+            vertex.initialize(vertexId, vertexValue, edges, null);
+            ++recordsRead;
+            if (LOG.getLevel() == Level.FINE) {
+                LOG.fine("next: Return vertexId=" + vertex.getVertexId().get() + ", vertexValue="
+                        + vertex.getVertexValue() + ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+            }
+            return vertex;
+        }
+    }
+
+    /**
+     * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
+     */
+    public static class SimplePageRankVertexInputFormat extends
+            GeneratedVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+        @Override
+        public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
+                InputSplit split, TaskAttemptContext context) throws IOException {
+            return new SimplePageRankVertexReader();
+        }
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link SimplePageRankVertex}
+     */
+    public static class SimplePageRankVertexWriter extends
+            TextVertexWriter<VLongWritable, DoubleWritable, FloatWritable> {
+        public SimplePageRankVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, DoubleWritable, FloatWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    /**
+     * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
+     */
+    public static class SimplePageRankVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, DoubleWritable, FloatWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, DoubleWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimplePageRankVertexWriter(recordWriter);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(PageRankVertex.class.getSimpleName());
+        job.setVertexClass(PageRankVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+        job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+        Client.run(args, job);
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
new file mode 100644
index 0000000..2f0ca45
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextReachibilityVertexInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel reachibility query implementation, for undirected graph (e.g., Facebook, LinkedIn graph).
+ */
+public class ReachabilityVertex extends Vertex<VLongWritable, ByteWritable, FloatWritable, ByteWritable> {
+
+    public static class SimpleReachibilityCombiner extends MessageCombiner<VLongWritable, ByteWritable, ByteWritable> {
+        private ByteWritable agg = new ByteWritable();
+        private MsgList<ByteWritable> msgList;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @Override
+        public void init(MsgList msgList) {
+            this.msgList = msgList;
+            agg.set((byte) 0);
+        }
+
+        @Override
+        public void stepPartial(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
+            int newState = agg.get() | msg.get();
+            agg.set((byte) newState);
+        }
+
+        @Override
+        public void stepFinal(VLongWritable vertexIndex, ByteWritable partialAggregate) throws HyracksDataException {
+            int newState = agg.get() | partialAggregate.get();
+            agg.set((byte) newState);
+        }
+
+        @Override
+        public ByteWritable finishPartial() {
+            return agg;
+        }
+
+        @Override
+        public MsgList<ByteWritable> finishFinal() {
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
+    }
+
+    private ByteWritable tmpVertexValue = new ByteWritable();
+
+    /** The source vertex id */
+    public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
+    /** The destination vertex id */
+    public static final String DEST_ID = "ReachibilityVertex.destId";
+    /** Default source vertex id */
+    public static final long SOURCE_ID_DEFAULT = 1;
+    /** Default destination vertex id */
+    public static final long DEST_ID_DEFAULT = 1;
+
+    /**
+     * Is this vertex the source id?
+     * 
+     * @return True if the source id
+     */
+    private boolean isSource(VLongWritable v) {
+        return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+    }
+
+    /**
+     * Is this vertex the dest id?
+     * 
+     * @return True if the source id
+     */
+    private boolean isDest(VLongWritable v) {
+        return (v.get() == getContext().getConfiguration().getLong(DEST_ID, DEST_ID_DEFAULT));
+    }
+
+    @Override
+    public void compute(Iterator<ByteWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            boolean isSource = isSource(getVertexId());
+            if (isSource) {
+                tmpVertexValue.set((byte) 1);
+                setVertexValue(tmpVertexValue);
+            }
+            boolean isDest = isDest(getVertexId());
+            if (isDest) {
+                tmpVertexValue.set((byte) 2);
+                setVertexValue(tmpVertexValue);
+            }
+            if (isSource && isDest) {
+                signalTerminate();
+                return;
+            }
+            if (isSource || isDest) {
+                sendOutMsgs();
+            } else {
+                tmpVertexValue.set((byte) 0);
+                setVertexValue(tmpVertexValue);
+            }
+        } else {
+            while (msgIterator.hasNext()) {
+                ByteWritable msg = msgIterator.next();
+                int msgValue = msg.get();
+                if (msgValue < 3) {
+                    int state = getVertexValue().get();
+                    int newState = state | msgValue;
+                    boolean changed = state == newState ? false : true;
+                    if (changed) {
+                        tmpVertexValue.set((byte) newState);
+                        setVertexValue(tmpVertexValue);
+                        if (newState < 3) {
+                            sendOutMsgs();
+                        } else {
+                            signalTerminate();
+                        }
+                    }
+                } else {
+                    signalTerminate();
+                }
+            }
+        }
+        voteToHalt();
+    }
+
+    private void signalTerminate() {
+        Configuration conf = getContext().getConfiguration();
+        try {
+            IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+            writeReachibilityResult(conf, true);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void sendOutMsgs() {
+        for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+            sendMsg(edge.getDestVertexId(), tmpVertexValue);
+        }
+    }
+
+    private void writeReachibilityResult(Configuration conf, boolean terminate) {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                FSDataOutputStream output = dfs.create(path, true);
+                output.writeBoolean(terminate);
+                output.flush();
+                output.close();
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private static boolean readReachibilityResult(Configuration conf) {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                return false;
+            } else {
+                return true;
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(ReachabilityVertex.class.getSimpleName());
+        job.setVertexClass(ReachabilityVertex.class);
+        job.setVertexInputFormatClass(TextReachibilityVertexInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleReachibilityVertexOutputFormat.class);
+        job.setMessageCombinerClass(ReachabilityVertex.SimpleReachibilityCombiner.class);
+        Client.run(args, job);
+        System.out.println("reachable? " + readReachibilityResult(job.getConfiguration()));
+    }
+
+    /**
+     * Simple VertexWriter
+     */
+    public static class SimpleReachibilityVertexWriter extends
+            TextVertexWriter<VLongWritable, ByteWritable, FloatWritable> {
+        public SimpleReachibilityVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, ByteWritable, FloatWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    /**
+     * output format for reachibility
+     */
+    public static class SimpleReachibilityVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, ByteWritable, FloatWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, ByteWritable, FloatWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleReachibilityVertexWriter(recordWriter);
+        }
+
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
new file mode 100644
index 0000000..a018f08
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Edge;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.inputformat.TextShortestPathsInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel shortest paths implementation.
+ */
+public class ShortestPathsVertex extends Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+    /**
+     * Test whether combiner is called by summing up the messages.
+     */
+    public static class SimpleMinCombiner extends MessageCombiner<VLongWritable, DoubleWritable, DoubleWritable> {
+        private double min = Double.MAX_VALUE;
+        private DoubleWritable agg = new DoubleWritable();
+        private MsgList<DoubleWritable> msgList;
+
+        @Override
+        public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+            double value = msg.get();
+            if (min > value)
+                min = value;
+        }
+
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        @Override
+        public void init(MsgList msgList) {
+            min = Double.MAX_VALUE;
+            this.msgList = msgList;
+        }
+
+        @Override
+        public DoubleWritable finishPartial() {
+            agg.set(min);
+            return agg;
+        }
+
+        @Override
+        public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
+            double value = partialAggregate.get();
+            if (min > value)
+                min = value;
+        }
+
+        @Override
+        public MsgList<DoubleWritable> finishFinal() {
+            agg.set(min);
+            msgList.clear();
+            msgList.add(agg);
+            return msgList;
+        }
+    }
+
+    private DoubleWritable outputValue = new DoubleWritable();
+    private DoubleWritable tmpVertexValue = new DoubleWritable();
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(ShortestPathsVertex.class.getName());
+    /** The shortest paths id */
+    public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
+    /** Default shortest paths id */
+    public static final long SOURCE_ID_DEFAULT = 1;
+
+    /**
+     * Is this vertex the source id?
+     * 
+     * @return True if the source id
+     */
+    private boolean isSource() {
+        return (getVertexId().get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+    }
+
+    @Override
+    public void compute(Iterator<DoubleWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            tmpVertexValue.set(Double.MAX_VALUE);
+            setVertexValue(tmpVertexValue);
+        }
+        double minDist = isSource() ? 0d : Double.MAX_VALUE;
+        while (msgIterator.hasNext()) {
+            minDist = Math.min(minDist, msgIterator.next().get());
+        }
+        if (LOG.getLevel() == Level.FINE) {
+            LOG.fine("Vertex " + getVertexId() + " got minDist = " + minDist + " vertex value = " + getVertexValue());
+        }
+        if (minDist < getVertexValue().get()) {
+            tmpVertexValue.set(minDist);
+            setVertexValue(tmpVertexValue);
+            for (Edge<VLongWritable, FloatWritable> edge : getEdges()) {
+                if (LOG.getLevel() == Level.FINE) {
+                    LOG.fine("Vertex " + getVertexId() + " sent to " + edge.getDestVertexId() + " = "
+                            + (minDist + edge.getEdgeValue().get()));
+                }
+                outputValue.set(minDist + edge.getEdgeValue().get());
+                sendMsg(edge.getDestVertexId(), outputValue);
+            }
+        }
+        voteToHalt();
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(ShortestPathsVertex.class.getSimpleName());
+        job.setVertexClass(ShortestPathsVertex.class);
+        job.setVertexInputFormatClass(TextShortestPathsInputFormat.class);
+        job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+        job.setMessageCombinerClass(ShortestPathsVertex.SimpleMinCombiner.class);
+        job.getConfiguration().setLong(SOURCE_ID, 0);
+        Client.run(args, job);
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java
new file mode 100644
index 0000000..68b7cca
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class VertexAggregator {
+
+    public static class MapRecordOnly extends MapReduceBase implements
+            Mapper<LongWritable, Text, NullWritable, LongWritable> {
+        private final NullWritable nullValue = NullWritable.get();
+        private final LongWritable count = new LongWritable(1);
+
+        public void map(LongWritable id, Text inputValue, OutputCollector<NullWritable, LongWritable> output,
+                Reporter reporter) throws IOException {
+            output.collect(nullValue, count);
+        }
+    }
+
+    public static class CombineRecordOnly extends MapReduceBase implements
+            Reducer<NullWritable, LongWritable, NullWritable, LongWritable> {
+        private final NullWritable nullValue = NullWritable.get();
+
+        public void reduce(NullWritable inputKey, Iterator<LongWritable> inputValue,
+                OutputCollector<NullWritable, LongWritable> output, Reporter reporter) throws IOException {
+            long count = 0;
+            while (inputValue.hasNext())
+                count += inputValue.next().get();
+            output.collect(nullValue, new LongWritable(count));
+        }
+    }
+
+    public static class ReduceRecordOnly extends MapReduceBase implements
+            Reducer<NullWritable, LongWritable, NullWritable, Text> {
+        private final NullWritable nullValue = NullWritable.get();
+
+        public void reduce(NullWritable inputKey, Iterator<LongWritable> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            long count = 0;
+            while (inputValue.hasNext())
+                count += inputValue.next().get();
+            output.collect(nullValue, new Text(Long.toString(count)));
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        JobConf job = new JobConf(VertexAggregator.class);
+
+        job.setJobName(VertexAggregator.class.getSimpleName());
+        job.setMapperClass(MapRecordOnly.class);
+        job.setCombinerClass(CombineRecordOnly.class);
+        job.setReducerClass(ReduceRecordOnly.class);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(LongWritable.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+        job.setNumReduceTasks(Integer.parseInt(args[2]));
+        JobClient.runJob(job);
+    }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java
new file mode 100644
index 0000000..1dd6922
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/VertexSorter.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class VertexSorter {
+    public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+        private static String separator = " ";
+
+        public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+                throws IOException {
+            String[] fields = inputValue.toString().split(separator);
+            LongWritable vertexId = new LongWritable(Long.parseLong(fields[0]));
+            output.collect(vertexId, inputValue);
+        }
+    }
+
+    public static class ReduceRecordOnly extends MapReduceBase implements
+            Reducer<LongWritable, Text, NullWritable, Text> {
+
+        NullWritable key = NullWritable.get();
+
+        public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+                OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+            while (inputValue.hasNext())
+                output.collect(key, inputValue.next());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        JobConf job = new JobConf(VertexSorter.class);
+
+        job.setJobName(VertexSorter.class.getSimpleName());
+        job.setMapperClass(MapRecordOnly.class);
+        job.setReducerClass(ReduceRecordOnly.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.setInputFormat(TextInputFormat.class);
+        FileInputFormat.setInputPaths(job, args[0]);
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+        job.setNumReduceTasks(Integer.parseInt(args[2]));
+        JobClient.runJob(job);
+    }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
new file mode 100644
index 0000000..597ad6e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/client/Client.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.example.PageRankVertex;
+import edu.uci.ics.pregelix.example.ReachabilityVertex;
+import edu.uci.ics.pregelix.example.ShortestPathsVertex;
+
+public class Client {
+
+    private static class Options {
+        @Option(name = "-inputpaths", usage = "comma seprated input paths", required = true)
+        public String inputPaths;
+
+        @Option(name = "-outputpath", usage = "output path", required = true)
+        public String outputPath;
+
+        @Option(name = "-ip", usage = "ip address of cluster controller", required = true)
+        public String ipAddress;
+
+        @Option(name = "-port", usage = "port of cluster controller", required = false)
+        public int port;
+
+        @Option(name = "-plan", usage = "query plan choice", required = false)
+        public Plan planChoice = Plan.OUTER_JOIN;
+
+        @Option(name = "-vnum", usage = "number of vertices", required = false)
+        public long numVertices;
+
+        @Option(name = "-enum", usage = "number of vertices", required = false)
+        public long numEdges;
+
+        @Option(name = "-source-vertex", usage = "source vertex id, for shortest paths/reachibility only", required = false)
+        public long sourceId;
+
+        @Option(name = "-dest-vertex", usage = "dest vertex id, for reachibility only", required = false)
+        public long destId;
+
+        @Option(name = "-num-iteration", usage = "max number of iterations, for pagerank job only", required = false)
+        public long numIteration = -1;
+
+        @Option(name = "-runtime-profiling", usage = "whether to do runtime profifling", required = false)
+        public String profiling = "false";
+    }
+
+    public static void run(String[] args, PregelixJob job) throws Exception {
+        Options options = prepareJob(args, job);
+        Driver driver = new Driver(Client.class);
+        driver.runJob(job, options.planChoice, options.ipAddress, options.port, Boolean.parseBoolean(options.profiling));
+    }
+
+    private static Options prepareJob(String[] args, PregelixJob job) throws CmdLineException, IOException {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        String[] inputs = options.inputPaths.split(";");
+        FileInputFormat.setInputPaths(job, inputs[0]);
+        for (int i = 1; i < inputs.length; i++)
+            FileInputFormat.addInputPaths(job, inputs[0]);
+        FileOutputFormat.setOutputPath(job, new Path(options.outputPath));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, options.numVertices);
+        job.getConfiguration().setLong(PregelixJob.NUM_EDGES, options.numEdges);
+        job.getConfiguration().setLong(ShortestPathsVertex.SOURCE_ID, options.sourceId);
+        job.getConfiguration().setLong(ReachabilityVertex.SOURCE_ID, options.sourceId);
+        job.getConfiguration().setLong(ReachabilityVertex.DEST_ID, options.destId);
+        if (options.numIteration > 0)
+            job.getConfiguration().setLong(PageRankVertex.ITERATIONS, options.numIteration);
+        return options;
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
new file mode 100644
index 0000000..a802403
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextConnectedComponentsInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextConnectedComponentsInputFormat extends
+        TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+    @Override
+    public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextReachibilityGraphReader(textInputFormat.createRecordReader(split, context));
+    }
+}
+
+@SuppressWarnings("rawtypes")
+class TextConnectedComponentsGraphReader extends
+        TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+    private final static String separator = " ";
+    private Vertex vertex;
+    private VLongWritable vertexId = new VLongWritable();
+    private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+    private int used = 0;
+
+    public TextConnectedComponentsGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+        super(lineRecordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> getCurrentVertex() throws IOException,
+            InterruptedException {
+        used = 0;
+        if (vertex == null)
+            vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+
+        vertex.reset();
+        Text line = getRecordReader().getCurrentValue();
+        String[] fields = line.toString().split(separator);
+
+        if (fields.length > 0) {
+            /**
+             * set the src vertex id
+             */
+            long src = Long.parseLong(fields[0]);
+            vertexId.set(src);
+            vertex.setVertexId(vertexId);
+            long dest = -1L;
+
+            /**
+             * set up edges
+             */
+            for (int i = 1; i < fields.length; i++) {
+                dest = Long.parseLong(fields[i]);
+                VLongWritable destId = allocate();
+                destId.set(dest);
+                vertex.addEdge(destId, null);
+            }
+        }
+        // vertex.sortEdges();
+        return vertex;
+    }
+
+    private VLongWritable allocate() {
+        if (used >= pool.size()) {
+            VLongWritable value = new VLongWritable();
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            VLongWritable value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
new file mode 100644
index 0000000..a8a752e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextPageRankInputFormat extends
+        TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+    @Override
+    public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context));
+    }
+}
+
+@SuppressWarnings("rawtypes")
+class TextPageRankGraphReader extends TextVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+    private final static String separator = " ";
+    private Vertex vertex;
+    private VLongWritable vertexId = new VLongWritable();
+    private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+    private int used = 0;
+
+    public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+        super(lineRecordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> getCurrentVertex() throws IOException,
+            InterruptedException {
+        used = 0;
+        if (vertex == null)
+            vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+
+        vertex.reset();
+        Text line = getRecordReader().getCurrentValue();
+        String[] fields = line.toString().split(separator);
+
+        if (fields.length > 0) {
+            /**
+             * set the src vertex id
+             */
+            long src = Long.parseLong(fields[0]);
+            vertexId.set(src);
+            vertex.setVertexId(vertexId);
+            long dest = -1L;
+
+            /**
+             * set up edges
+             */
+            for (int i = 1; i < fields.length; i++) {
+                dest = Long.parseLong(fields[i]);
+                VLongWritable destId = allocate();
+                destId.set(dest);
+                vertex.addEdge(destId, null);
+            }
+        }
+        // vertex.sortEdges();
+        return vertex;
+    }
+
+    private VLongWritable allocate() {
+        if (used >= pool.size()) {
+            VLongWritable value = new VLongWritable();
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            VLongWritable value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
new file mode 100644
index 0000000..9ef1d49
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextReachibilityVertexInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextReachibilityVertexInputFormat extends
+        TextVertexInputFormat<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+    @Override
+    public VertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextConnectedComponentsGraphReader(textInputFormat.createRecordReader(split, context));
+    }
+}
+
+@SuppressWarnings("rawtypes")
+class TextReachibilityGraphReader extends
+        TextVertexReader<VLongWritable, VLongWritable, FloatWritable, VLongWritable> {
+
+    private final static String separator = " ";
+    private Vertex vertex;
+    private VLongWritable vertexId = new VLongWritable();
+    private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+    private int used = 0;
+
+    public TextReachibilityGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+        super(lineRecordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Vertex<VLongWritable, VLongWritable, FloatWritable, VLongWritable> getCurrentVertex() throws IOException,
+            InterruptedException {
+        used = 0;
+        if (vertex == null)
+            vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+
+        vertex.reset();
+        Text line = getRecordReader().getCurrentValue();
+        String[] fields = line.toString().split(separator);
+
+        if (fields.length > 0) {
+            /**
+             * set the src vertex id
+             */
+            long src = Long.parseLong(fields[0]);
+            vertexId.set(src);
+            vertex.setVertexId(vertexId);
+            long dest = -1L;
+
+            /**
+             * set up edges
+             */
+            for (int i = 1; i < fields.length; i++) {
+                dest = Long.parseLong(fields[i]);
+                VLongWritable destId = allocate();
+                destId.set(dest);
+                vertex.addEdge(destId, null);
+            }
+        }
+        // vertex.sortEdges();
+        return vertex;
+    }
+
+    private VLongWritable allocate() {
+        if (used >= pool.size()) {
+            VLongWritable value = new VLongWritable();
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            VLongWritable value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
new file mode 100644
index 0000000..d445935
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextShortestPathsInputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.inputformat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexInputFormat.TextVertexReader;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+public class TextShortestPathsInputFormat extends
+        TextVertexInputFormat<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+    @Override
+    public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
+            InputSplit split, TaskAttemptContext context) throws IOException {
+        return new TextShortestPathsGraphReader(textInputFormat.createRecordReader(split, context));
+    }
+}
+
+@SuppressWarnings("rawtypes")
+class TextShortestPathsGraphReader extends
+        TextVertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+    private final static String separator = " ";
+    private Vertex vertex;
+    private FloatWritable initValue = new FloatWritable(1.0f);
+    private VLongWritable vertexId = new VLongWritable();
+    private List<VLongWritable> pool = new ArrayList<VLongWritable>();
+    private int used = 0;
+
+    public TextShortestPathsGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+        super(lineRecordReader);
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+        return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> getCurrentVertex() throws IOException,
+            InterruptedException {
+        used = 0;
+        if (vertex == null)
+            vertex = (Vertex) BspUtils.createVertex(getContext().getConfiguration());
+
+        vertex.getMsgList().clear();
+        vertex.getEdges().clear();
+        Text line = getRecordReader().getCurrentValue();
+        String[] fields = line.toString().split(separator);
+
+        if (fields.length > 0) {
+            /**
+             * set the src vertex id
+             */
+            long src = Long.parseLong(fields[0]);
+            vertexId.set(src);
+            vertex.setVertexId(vertexId);
+            long dest = -1L;
+
+            /**
+             * set up edges
+             */
+            for (int i = 1; i < fields.length; i++) {
+                dest = Long.parseLong(fields[i]);
+                VLongWritable destId = allocate();
+                destId.set(dest);
+                vertex.addEdge(destId, initValue);
+            }
+        }
+        // vertex.sortEdges();
+        return vertex;
+    }
+
+    private VLongWritable allocate() {
+        if (used >= pool.size()) {
+            VLongWritable value = new VLongWritable();
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            VLongWritable value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
new file mode 100644
index 0000000..a6c2c1e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/io/VLongWritable.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * A WritableComparable for longs in a variable-length format. Such values take
+ * between one and five bytes. Smaller values take fewer bytes.
+ * 
+ * @see org.apache.hadoop.io.WritableUtils#readVLong(DataInput)
+ */
+@SuppressWarnings("rawtypes")
+public class VLongWritable implements WritableComparable {
+    private long value;
+
+    public VLongWritable() {
+    }
+
+    public VLongWritable(long value) {
+        set(value);
+    }
+
+    /** Set the value of this LongWritable. */
+    public void set(long value) {
+        this.value = value;
+    }
+
+    /** Return the value of this LongWritable. */
+    public long get() {
+        return value;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        value = SerDeUtils.readVLong(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        SerDeUtils.writeVLong(out, value);
+    }
+
+    /** Returns true iff <code>o</code> is a VLongWritable with the same value. */
+    public boolean equals(Object o) {
+        if (!(o instanceof VLongWritable))
+            return false;
+        VLongWritable other = (VLongWritable) o;
+        return this.value == other.value;
+    }
+
+    public int hashCode() {
+        return (int) value;
+    }
+
+    /** Compares two VLongWritables. */
+    public int compareTo(Object o) {
+        long thisValue = this.value;
+        long thatValue = ((VLongWritable) o).value;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    public String toString() {
+        return Long.toString(value);
+    }
+
+    /** A Comparator optimized for LongWritable. */
+    public static class Comparator extends WritableComparator {
+        public Comparator() {
+            super(VLongWritable.class);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            long thisValue = SerDeUtils.readVLong(b1, s1, l1);
+            long thatValue = SerDeUtils.readVLong(b2, s2, l2);
+            return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+        }
+    }
+
+    /** A decreasing Comparator optimized for LongWritable. */
+    public static class DecreasingComparator extends Comparator {
+        public int compare(WritableComparable a, WritableComparable b) {
+            return -super.compare(a, b);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return -super.compare(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    static { // register default comparator
+        WritableComparator.define(VLongWritable.class, new Comparator());
+    }
+
+}