add Pregelix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
new file mode 100644
index 0000000..0c90fc4
--- /dev/null
+++ b/pregelix/pregelix-api/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>pregelix-api</artifactId>
+	<name>pregelix-api</name>
+
+	<parent>
+		<groupId>edu.uci.ics.pregelix</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.2</version>
+				<configuration>
+					<outputDirectory>target</outputDirectory>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.6</version>
+				<configuration>
+					<systemProperties>
+						<property>
+							<name>prop.jarLocation</name>
+							<value>target/pregelix-example-${project.version}-jar-with-dependencies.jar</value>
+						</property>
+					</systemProperties>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+								<include>edu.uci.*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>0.20.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
new file mode 100644
index 0000000..5ebf9df
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.delegate;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+
+@SuppressWarnings("rawtypes")
+public class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private Vertex vertex = null;
+
+    /** message tuple builder */
+    private ArrayTupleBuilder message;
+    private IFrameWriter msgWriter;
+    private FrameTupleAppender appenderMsg;
+
+    /** alive tuple builder */
+    private ArrayTupleBuilder alive;
+    private IFrameWriter aliveWriter;
+    private FrameTupleAppender appenderAlive;
+
+    /** message list */
+    private MsgList dummyMessageList = new MsgList();
+    /** whether alive message should be pushed out */
+    private boolean pushAlive;
+
+    public VertexDelegate(Vertex vertex) {
+        this.vertex = vertex;
+    }
+
+    public void finishCompute() throws IOException {
+        // package alive info
+        if (pushAlive && !vertex.isHalted()) {
+            alive.reset();
+            DataOutput outputAlive = alive.getDataOutput();
+            vertexId.write(outputAlive);
+            alive.addFieldEndOffset();
+            dummyMessageList.write(outputAlive);
+            alive.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderAlive, alive, aliveWriter);
+        }
+    }
+
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+
+        /**
+         * send out message along message channel
+         */
+        try {
+            message.reset();
+            DataOutput outputMsg = message.getDataOutput();
+            id.write(outputMsg);
+            message.addFieldEndOffset();
+            msg.write(outputMsg);
+            message.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderMsg, message, msgWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public final void setVertex(Vertex vertex) {
+        this.vertex = vertex;
+    }
+
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+    }
+
+    public final void setOutputWriters(List<IFrameWriter> outputs) {
+        msgWriter = outputs.get(0);
+        if (outputs.size() > 1) {
+            aliveWriter = outputs.get(1);
+            pushAlive = true;
+        }
+    }
+
+    public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        appenderMsg = appenders.get(0);
+        if (appenders.size() > 1) {
+            appenderAlive = appenders.get(1);
+        }
+    }
+
+    public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        message = tbs.get(0);
+        if (tbs.size() > 1) {
+            alive = tbs.get(1);
+        }
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
new file mode 100644
index 0000000..84bc8de
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * A complete edge, the destination vertex and the edge value. Can only be one
+ * edge with a destination vertex id per edge map.
+ * 
+ * @param <I>
+ *            Vertex index
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class Edge<I extends WritableComparable, E extends Writable> implements Writable, Configurable, Comparable {
+    /** Destination vertex id */
+    private I destVertexId = null;
+    /** Edge value */
+    private E edgeValue = null;
+    /** Configuration - Used to instiantiate classes */
+    private Configuration conf = null;
+    private boolean hasEdgeValue = false;
+
+    /**
+     * Constructor for reflection
+     */
+    public Edge() {
+    }
+
+    /**
+     * Create the edge with final values
+     * 
+     * @param destVertexId
+     * @param edgeValue
+     */
+    public Edge(I destVertexId, E edgeValue) {
+        this.destVertexId = destVertexId;
+        this.edgeValue = edgeValue;
+        if (edgeValue != null)
+            hasEdgeValue = true;
+    }
+
+    /**
+     * Get the destination vertex index of this edge
+     * 
+     * @return Destination vertex index of this edge
+     */
+    public I getDestVertexId() {
+        return destVertexId;
+    }
+
+    /**
+     * set the destination vertex id
+     * 
+     * @param destVertexId
+     */
+    public void setDestVertexId(I destVertexId) {
+        this.destVertexId = destVertexId;
+    }
+
+    /**
+     * Get the edge value of the edge
+     * 
+     * @return Edge value of this edge
+     */
+    public E getEdgeValue() {
+        return edgeValue;
+    }
+
+    /**
+     * set the edge of value
+     * 
+     * @param edgeValue
+     */
+    public void setEdgeValue(E edgeValue) {
+        this.edgeValue = edgeValue;
+        if (edgeValue != null)
+            hasEdgeValue = true;
+    }
+
+    @Override
+    public String toString() {
+        return "(DestVertexIndex = " + destVertexId + ", edgeValue = " + edgeValue + ")";
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        if (destVertexId == null)
+            destVertexId = (I) BspUtils.createVertexIndex(getConf());
+        destVertexId.readFields(input);
+        hasEdgeValue = input.readBoolean();
+        if (hasEdgeValue) {
+            if (edgeValue == null)
+                edgeValue = (E) BspUtils.createEdgeValue(getConf());
+            edgeValue.readFields(input);
+        }
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        if (destVertexId == null) {
+            throw new IllegalStateException("write: Null destination vertex index");
+        }
+        destVertexId.write(output);
+        output.writeBoolean(hasEdgeValue);
+        if (hasEdgeValue)
+            edgeValue.write(output);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public boolean equals(Edge<I, E> edge) {
+        return this.destVertexId.equals(edge.getDestVertexId());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compareTo(Object o) {
+        Edge<I, E> edge = (Edge<I, E>) o;
+        return destVertexId.compareTo(edge.getDestVertexId());
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
new file mode 100644
index 0000000..57261f9
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that allows the message class to be
+ * set prior to calling readFields().
+ * 
+ * @param <M>
+ *            message type
+ */
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+    /** Defining a layout version for a serializable class. */
+    private static final long serialVersionUID = 100L;
+
+    /**
+     * Default constructor.
+     */
+    public MsgList() {
+        super();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setClass() {
+        setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
new file mode 100644
index 0000000..6a77349
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.delegate.VertexDelegate;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * User applications should all subclass {@link Vertex}. Package access should
+ * prevent users from accessing internal methods.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        implements Writable {
+    private static long superstep = 0;
+    /** Class-wide number of vertices */
+    private static long numVertices = -1;
+    /** Class-wide number of edges */
+    private static long numEdges = -1;
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** Map of destination vertices and their edge values */
+    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+    /** map context */
+    private static Mapper.Context context = null;
+    /** a delegate for hyracks stuff */
+    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+    /** this vertex is updated or not */
+    private boolean updated = false;
+    /** has outgoing messages */
+    private boolean hasMessage = false;
+
+    /**
+     * use object pool for re-using objects
+     */
+    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+    private List<M> msgPool = new ArrayList<M>();
+    private List<V> valuePool = new ArrayList<V>();
+    private int usedEdge = 0;
+    private int usedMessage = 0;
+    private int usedValue = 0;
+
+    /**
+     * The key method that users need to implement
+     * 
+     * @param msgIterator
+     */
+    public abstract void compute(Iterator<M> msgIterator);
+
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        Edge<I, E> edge = this.allocateEdge();
+        edge.setDestVertexId(targetVertexId);
+        edge.setEdgeValue(edgeValue);
+        destEdgeList.add(edge);
+        return true;
+    }
+
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        destEdgeList.clear();
+        if (edges != null && !edges.isEmpty()) {
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
+
+    public void reset() {
+        usedEdge = 0;
+        usedMessage = 0;
+        usedValue = 0;
+    }
+
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
+
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
+
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedEdge);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
+
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+        delegate.setVertexId(vertexId);
+    }
+
+    public final I getVertexId() {
+        return vertexId;
+    }
+
+    /**
+     * Set the global superstep folr all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
+
+    public static long getCurrentSuperstep() {
+        return superstep;
+    }
+
+    public final long getSuperstep() {
+        return superstep;
+    }
+
+    public final V getVertexValue() {
+        return vertexValue;
+    }
+
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+        this.updated = true;
+    }
+
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
+
+    public final long getNumVertices() {
+        return numVertices;
+    }
+
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
+
+    public final long getNumEdges() {
+        return numEdges;
+    }
+
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+        delegate.sendMsg(id, msg);
+        this.hasMessage = true;
+    }
+
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (Edge<I, E> edge : destEdgeList) {
+            sendMsg(edge.getDestVertexId(), msg);
+        }
+    }
+
+    public final void voteToHalt() {
+        halt = true;
+    }
+
+    public final boolean isHalted() {
+        return halt;
+    }
+
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        reset();
+        if (vertexId == null)
+            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        vertexId.readFields(in);
+        delegate.setVertexId(vertexId);
+        boolean hasVertexValue = in.readBoolean();
+        if (hasVertexValue) {
+            vertexValue = allocateValue();
+            vertexValue.readFields(in);
+            delegate.setVertex(this);
+        }
+        destEdgeList.clear();
+        long edgeMapSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < edgeMapSize; ++i) {
+            Edge<I, E> edge = allocateEdge();
+            edge.setConf(getContext().getConfiguration());
+            edge.readFields(in);
+            addEdge(edge);
+        }
+        msgList.clear();
+        long msgListSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < msgListSize; ++i) {
+            M msg = allocateMessage();
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+        updated = false;
+        hasMessage = false;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        SerDeUtils.writeVLong(out, destEdgeList.size());
+        for (Edge<I, E> edge : destEdgeList) {
+            edge.write(out);
+        }
+        SerDeUtils.writeVLong(out, msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
+
+    private boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        destEdgeList.add(edge);
+        return true;
+    }
+
+    public List<M> getMsgList() {
+        return msgList;
+    }
+
+    public List<Edge<I, E>> getEdges() {
+        return this.destEdgeList;
+    }
+
+    public final Mapper<?, ?, ?, ?>.Context getContext() {
+        return context;
+    }
+
+    public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
+        Vertex.context = context;
+    }
+
+    @SuppressWarnings("unchecked")
+    public String toString() {
+        Collections.sort(destEdgeList);
+        StringBuffer edgeBuffer = new StringBuffer();
+        edgeBuffer.append("(");
+        for (Edge<I, E> edge : destEdgeList) {
+            edgeBuffer.append(edge.getDestVertexId()).append(",");
+        }
+        edgeBuffer.append(")");
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+    }
+
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
+
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
+
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
+
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
+
+    public boolean hasUpdate() {
+        return this.updated;
+    }
+
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
+
+    public int getNumOutEdges() {
+        return destEdgeList.size();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void sortEdges() {
+        Collections.sort((List) destEdgeList);
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
new file mode 100644
index 0000000..c54b4ea
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * interface to implement for combining of messages sent to the same vertex.
+ * 
+ * @param <I extends Writable> index
+ * @param <M extends Writable> message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexCombiner<I extends WritableComparable, M extends Writable> {
+
+    /**
+     * initialize combiner
+     */
+    public void init();
+
+    /**
+     * step call
+     * 
+     * @param vertexIndex
+     * @param msg
+     * @throws IOException
+     */
+    public void step(I vertexIndex, M msg) throws IOException;
+
+    /**
+     * finish aggregate
+     * 
+     * @return Message
+     */
+    public M finish();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
new file mode 100644
index 0000000..cfb2a5e
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This InputSplit will not give any ordering or location data. It is used
+ * internally by BspInputFormat (which determines how many tasks to run the
+ * application on). Users should not use this directly.
+ */
+public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
+    private static final long serialVersionUID = 1L;
+    /** Number of splits */
+    private int numSplits = -1;
+    /** Split index */
+    private int splitIndex = -1;
+
+    public BasicGenInputSplit() {
+    }
+
+    public BasicGenInputSplit(int splitIndex, int numSplits) {
+        this.splitIndex = splitIndex;
+        this.numSplits = numSplits;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[] {};
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        splitIndex = in.readInt();
+        numSplits = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(splitIndex);
+        out.writeInt(numSplits);
+    }
+
+    public int getSplitIndex() {
+        return splitIndex;
+    }
+
+    public int getNumSplits() {
+        return numSplits;
+    }
+
+    @Override
+    public String toString() {
+        return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
new file mode 100644
index 0000000..4da19de
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Use this to load data for a BSP application. Note that the InputSplit must
+ * also implement Writable. The InputSplits will determine the partitioning of
+ * vertices across the mappers, so keep that in consideration when implementing
+ * getSplits().
+ * 
+ * @param <I>
+ *            Vertex id
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Logically split the vertices for a graph processing application.
+     * 
+     * Each {@link InputSplit} is then assigned to a worker for processing.
+     * 
+     * <p>
+     * <i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+     * input files are not physically split into chunks. For e.g. a split could
+     * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+     * also creates the {@link VertexReader} to read the {@link InputSplit}.
+     * 
+     * Also, the number of workers is a hint given to the developer to try to
+     * intelligently determine how many splits to create (if this is adjustable)
+     * at runtime.
+     * 
+     * @param context
+     *            Context of the job
+     * @param numWorkers
+     *            Number of workers used for this job
+     * @return an array of {@link InputSplit}s for the job.
+     */
+    public abstract List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException,
+            InterruptedException;
+
+    /**
+     * Create a vertex reader for a given split. The framework will call
+     * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+     * the split is used.
+     * 
+     * @param split
+     *            the split to be read
+     * @param context
+     *            the information about the task
+     * @return a new record reader
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context)
+            throws IOException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
new file mode 100644
index 0000000..4675afe
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implement to output the graph after the computation. It is modeled directly
+ * after the Hadoop OutputFormat.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> {
+    /**
+     * Create a vertex writer for a given split. The framework will call
+     * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+     * the split is used.
+     * 
+     * @param context
+     *            the information about the task
+     * @return a new vertex writer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+            InterruptedException;
+
+    /**
+     * Check for validity of the output-specification for the job. (Copied from
+     * Hadoop OutputFormat)
+     * 
+     * <p>
+     * This is to validate the output specification for the job when it is a job
+     * is submitted. Typically checks that it does not already exist, throwing
+     * an exception when it already exists, so that output is not overwritten.
+     * </p>
+     * 
+     * @param context
+     *            information about the job
+     * @throws IOException
+     *             when output should not be attempted
+     */
+    public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
+
+    /**
+     * Get the output committer for this output format. This is responsible for
+     * ensuring the output is committed correctly. (Copied from Hadoop
+     * OutputFormat)
+     * 
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+            InterruptedException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
new file mode 100644
index 0000000..2794231
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Analogous to {@link RecordReader} for vertices. Will read the vertices from
+ * an input split.
+ * 
+ * @param <I>
+ *            Vertex id
+ * @param <V>
+ *            Vertex data
+ * @param <E>
+ *            Edge data
+ * @param <M>
+ *            Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Use the input split and context t o setup reading the vertices.
+     * Guaranteed to be called prior to any other function.
+     * 
+     * @param inputSplit
+     *            Input split to be used for reading vertices.
+     * @param context
+     *            Context from the task.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException;
+
+    /**
+     * 
+     * @return false iff there are no more vertices
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    boolean nextVertex() throws IOException, InterruptedException;
+
+    /**
+     * Get the current vertex.
+     * 
+     * @return the current vertex which has been read. nextVertex() should be
+     *         called first.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException;
+
+    /**
+     * Close this {@link VertexReader} to future operations.
+     * 
+     * @throws IOException
+     */
+    void close() throws IOException;
+
+    /**
+     * How much of the input has the {@link VertexReader} consumed i.e. has been
+     * processed by?
+     * 
+     * @return Progress from <code>0.0</code> to <code>1.0</code>.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    float getProgress() throws IOException, InterruptedException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
new file mode 100644
index 0000000..a7654a9
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Implement to output a vertex range of the graph after the computation
+ * 
+ * @param <I>
+ *            Vertex id
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexWriter<I extends WritableComparable, V extends Writable, E extends Writable> {
+    /**
+     * Use the context to setup writing the vertices. Guaranteed to be called
+     * prior to any other function.
+     * 
+     * @param context
+     *            Context used to write the vertices.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void initialize(TaskAttemptContext context) throws IOException, InterruptedException;
+
+    /**
+     * Writes the next vertex and associated data
+     * 
+     * @param vertex
+     *            set the properties of this vertex
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException;
+
+    /**
+     * Close this {@link VertexWriter} to future operations.
+     * 
+     * @param context
+     *            the context of the task
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void close(TaskAttemptContext context) throws IOException, InterruptedException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
new file mode 100644
index 0000000..12924d2
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging. It simply generates
+ * some vertex data that can be consumed by test applications.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        // This is meaningless, the VertexReader will generate all the test
+        // data.
+        List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+        for (int i = 0; i < numWorkers; ++i) {
+            inputSplitList.add(new BasicGenInputSplit(i, numWorkers));
+        }
+        return inputSplitList;
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
new file mode 100644
index 0000000..fc4182b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Used by GeneratedVertexInputFormat to read some generated data
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        implements VertexReader<I, V, E, M> {
+    /** Records read so far */
+    protected long recordsRead = 0;
+    /** Total records to read (on this split alone) */
+    protected long totalRecords = 0;
+    /** The input split from initialize(). */
+    protected BasicGenInputSplit inputSplit = null;
+    /** Reverse the id order? */
+    protected boolean reverseIdOrder;
+
+    protected Configuration configuration = null;
+
+    public static final String READER_VERTICES = "GeneratedVertexReader.reader_vertices";
+    public static final long DEFAULT_READER_VERTICES = 10;
+    public static final String REVERSE_ID_ORDER = "GeneratedVertexReader.reverseIdOrder";
+    public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+
+    public GeneratedVertexReader() {
+    }
+
+    @Override
+    final public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
+        configuration = context.getConfiguration();
+        totalRecords = configuration.getLong(GeneratedVertexReader.READER_VERTICES,
+                GeneratedVertexReader.DEFAULT_READER_VERTICES);
+        reverseIdOrder = configuration.getBoolean(GeneratedVertexReader.REVERSE_ID_ORDER,
+                GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+        this.inputSplit = (BasicGenInputSplit) inputSplit;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    final public float getProgress() throws IOException {
+        return recordsRead * 100.0f / totalRecords;
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
new file mode 100644
index 0000000..4c804db
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
+    /** Uses the TextInputFormat to do everything */
+    protected TextInputFormat textInputFormat = new TextInputFormat();
+
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex input. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+            implements VertexReader<I, V, E, M> {
+        /** Internal line record reader */
+        private final RecordReader<LongWritable, Text> lineRecordReader;
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+
+        /**
+         * Initialize with the LineRecordReader.
+         * 
+         * @param lineRecordReader
+         *            Line record reader from TextInputFormat
+         */
+        public TextVertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+            this.lineRecordReader = lineRecordReader;
+        }
+
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            lineRecordReader.initialize(inputSplit, context);
+            this.context = context;
+        }
+
+        @Override
+        public void close() throws IOException {
+            lineRecordReader.close();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return lineRecordReader.getProgress();
+        }
+
+        /**
+         * Get the line record reader.
+         * 
+         * @return Record reader to be used for reading.
+         */
+        protected RecordReader<LongWritable, Text> getRecordReader() {
+            return lineRecordReader;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        protected TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        // Ignore the hint of numWorkers here since we are using TextInputFormat
+        // to do this for us
+        return textInputFormat.getSplits(context);
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
new file mode 100644
index 0000000..c026945
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+        extends VertexOutputFormat<I, V, E> {
+    /** Uses the TextOutputFormat to do everything */
+    protected TextOutputFormat<Text, Text> textOutputFormat = new TextOutputFormat<Text, Text>();
+
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex output. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class TextVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
+            implements VertexWriter<I, V, E> {
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+        /** Internal line record writer */
+        private final RecordWriter<Text, Text> lineRecordWriter;
+
+        /**
+         * Initialize with the LineRecordWriter.
+         * 
+         * @param lineRecordWriter
+         *            Line record writer from TextOutputFormat
+         */
+        public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            this.lineRecordWriter = lineRecordWriter;
+        }
+
+        @Override
+        public void initialize(TaskAttemptContext context) throws IOException {
+            this.context = context;
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+            lineRecordWriter.close(context);
+        }
+
+        /**
+         * Get the line record writer.
+         * 
+         * @return Record writer to be used for writing.
+         */
+        public RecordWriter<Text, Text> getRecordWriter() {
+            return lineRecordWriter;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        textOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return textOutputFormat.getOutputCommitter(context);
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
new file mode 100644
index 0000000..f9a05cb
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexCombiner;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+
+/**
+ * Limits the functions that can be called by the user. Job is too flexible for
+ * our needs. For instance, our job should not have any reduce tasks.
+ */
+public class PregelixJob extends Job {
+    /** Vertex class - required */
+    public static final String VERTEX_CLASS = "giraph.vertexClass";
+    /** VertexInputFormat class - required */
+    public static final String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+    /** VertexOutputFormat class - optional */
+    public static final String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+    /** Vertex combiner class - optional */
+    public static final String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+    /** Vertex resolver class - optional */
+    public static final String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+    /** Vertex index class */
+    public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
+    /** Vertex value class */
+    public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+    /** Edge value class */
+    public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+    /** Message value class */
+    public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+    /** Aggregator class */
+    public static final String AGGREGATOR_NAME = "giraph.aggregatorClass";
+    /** num of vertices */
+    public static final String NUM_VERTICE = "giraph.numVertices";
+    /** num of edges */
+    public static final String NUM_EDGES = "giraph.numEdges";
+
+    /**
+     * Constructor that will instantiate the configuration
+     * 
+     * @param jobName
+     *            User-defined job name
+     * @throws IOException
+     */
+    public PregelixJob(String jobName) throws IOException {
+        super(new Configuration(), jobName);
+    }
+
+    /**
+     * Constructor.
+     * 
+     * @param conf
+     *            User-defined configuration
+     * @param jobName
+     *            User-defined job name
+     * @throws IOException
+     */
+    public PregelixJob(Configuration conf, String jobName) throws IOException {
+        super(conf, jobName);
+    }
+
+    /**
+     * Set the vertex class (required)
+     * 
+     * @param vertexClass
+     *            Runs vertex computation
+     */
+    final public void setVertexClass(Class<?> vertexClass) {
+        getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+    }
+
+    /**
+     * Set the vertex input format class (required)
+     * 
+     * @param vertexInputFormatClass
+     *            Determines how graph is input
+     */
+    final public void setVertexInputFormatClass(Class<?> vertexInputFormatClass) {
+        getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, vertexInputFormatClass, VertexInputFormat.class);
+    }
+
+    /**
+     * Set the vertex output format class (optional)
+     * 
+     * @param vertexOutputFormatClass
+     *            Determines how graph is output
+     */
+    final public void setVertexOutputFormatClass(Class<?> vertexOutputFormatClass) {
+        getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, vertexOutputFormatClass, VertexOutputFormat.class);
+    }
+
+    /**
+     * Set the vertex combiner class (optional)
+     * 
+     * @param vertexCombinerClass
+     *            Determines how vertex messages are combined
+     */
+    final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
+        getConfiguration().setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, VertexCombiner.class);
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
new file mode 100644
index 0000000..47a6c4c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A Writable for ListArray containing instances of a class.
+ */
+public abstract class ArrayListWritable<M extends Writable> extends ArrayList<M> implements Writable, Configurable {
+    /** Used for instantiation */
+    private Class<M> refClass = null;
+    /** Defining a layout version for a serializable class. */
+    private static final long serialVersionUID = 1L;
+    /** Configuration */
+    private Configuration conf;
+    /** content object pool */
+    private List<M> pool = new ArrayList<M>();
+    /** how many instance in the pool has been used */
+    private int used = 0;
+
+    /**
+     * Using the default constructor requires that the user implement
+     * setClass(), guaranteed to be invoked prior to instantiation in
+     * readFields()
+     */
+    public ArrayListWritable() {
+    }
+
+    /**
+     * This constructor allows setting the refClass during construction.
+     * 
+     * @param refClass
+     *            internal type class
+     */
+    public ArrayListWritable(Class<M> refClass) {
+        super();
+        this.refClass = refClass;
+    }
+
+    /**
+     * This is a one-time operation to set the class type
+     * 
+     * @param refClass
+     *            internal type class
+     */
+    public void setClass(Class<M> refClass) {
+        if (this.refClass != null) {
+            throw new RuntimeException("setClass: refClass is already set to " + this.refClass.getName());
+        }
+        this.refClass = refClass;
+    }
+
+    /**
+     * Subclasses must set the class type appropriately and can use
+     * setClass(Class<M> refClass) to do it.
+     */
+    public abstract void setClass();
+
+    public void readFields(DataInput in) throws IOException {
+        if (this.refClass == null) {
+            setClass();
+        }
+        used = 0;
+        this.clear();
+        int numValues = in.readInt(); // read number of values
+        ensureCapacity(numValues);
+        for (int i = 0; i < numValues; i++) {
+            M value = allocateValue();
+            value.readFields(in); // read a value
+            add(value); // store it in values
+        }
+    }
+
+    public void write(DataOutput out) throws IOException {
+        int numValues = size();
+        out.writeInt(numValues); // write number of values
+        for (int i = 0; i < numValues; i++) {
+            get(i).write(out);
+        }
+    }
+
+    public final Configuration getConf() {
+        return conf;
+    }
+
+    public final void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    private M allocateValue() {
+        if (used >= pool.size()) {
+            M value = ReflectionUtils.newInstance(refClass, conf);
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            M value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+
+    public void reset(ArrayIterator<M> iterator) {
+        iterator.reset(this);
+    }
+
+    public static class ArrayIterator<M> implements Iterator<M> {
+
+        private int pos = 0;
+        private List<M> list;
+
+        private void reset(List<M> list) {
+            this.list = list;
+            pos = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return pos < list.size();
+        }
+
+        @Override
+        public M next() {
+            M item = list.get(pos);
+            pos++;
+            return item;
+        }
+
+        @Override
+        public void remove() {
+            throw new IllegalStateException("should not be called");
+        }
+
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
new file mode 100644
index 0000000..9c31646
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexCombiner;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Help to use the configuration to get the appropriate classes or instantiate
+ * them.
+ */
+public class BspUtils {
+    private static Configuration defaultConf = null;
+
+    public static void setDefaultConfiguration(Configuration conf) {
+        defaultConf = conf;
+    }
+
+    /**
+     * Get the user's subclassed {@link VertexInputFormat}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex input format class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+            Configuration conf) {
+        return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
+                null, VertexInputFormat.class);
+    }
+
+    /**
+     * Create a user vertex input format class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex input format class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+            Configuration conf) {
+        Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
+        VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+        return inputFormat;
+    }
+
+    /**
+     * Get the user's subclassed {@link VertexOutputFormat}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex output format class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable> Class<? extends VertexOutputFormat<I, V, E>> getVertexOutputFormatClass(
+            Configuration conf) {
+        return (Class<? extends VertexOutputFormat<I, V, E>>) conf.getClass(PregelixJob.VERTEX_OUTPUT_FORMAT_CLASS,
+                null, VertexOutputFormat.class);
+    }
+
+    /**
+     * Create a user vertex output format class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex output format class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable> VertexOutputFormat<I, V, E> createVertexOutputFormat(
+            Configuration conf) {
+        Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass = getVertexOutputFormatClass(conf);
+        return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+    }
+
+    /**
+     * Get the user's subclassed {@link VertexCombiner}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex combiner class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, M extends Writable> Class<? extends VertexCombiner<I, M>> getVertexCombinerClass(
+            Configuration conf) {
+        return (Class<? extends VertexCombiner<I, M>>) conf.getClass(PregelixJob.VERTEX_COMBINER_CLASS, null,
+                VertexCombiner.class);
+    }
+
+    /**
+     * Create a user vertex combiner class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex combiner class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, M extends Writable> VertexCombiner<I, M> createVertexCombiner(
+            Configuration conf) {
+        Class<? extends VertexCombiner<I, M>> vertexCombinerClass = getVertexCombinerClass(conf);
+        return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+    }
+
+    /**
+     * Get the user's subclassed Vertex.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+            Configuration conf) {
+        return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
+    }
+
+    /**
+     * Create a user vertex
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+            Configuration conf) {
+        Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+        Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
+        return vertex;
+    }
+
+    /**
+     * Get the user's subclassed vertex index class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex index class
+     */
+    @SuppressWarnings("unchecked")
+    public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
+    }
+
+    /**
+     * Create a user vertex index
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex index
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable> I createVertexIndex(Configuration conf) {
+        Class<I> vertexClass = getVertexIndexClass(conf);
+        try {
+            return vertexClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexIndex: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexIndex: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed vertex value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <V extends Writable> Class<V> getVertexValueClass(Configuration conf) {
+        return (Class<V>) conf.getClass(PregelixJob.VERTEX_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Create a user vertex value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex value
+     */
+    public static <V extends Writable> V createVertexValue(Configuration conf) {
+        Class<V> vertexValueClass = getVertexValueClass(conf);
+        try {
+            return vertexValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed edge value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex edge value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <E extends Writable> Class<E> getEdgeValueClass(Configuration conf) {
+        return (Class<E>) conf.getClass(PregelixJob.EDGE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Create a user edge value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user edge value
+     */
+    public static <E extends Writable> E createEdgeValue(Configuration conf) {
+        Class<E> edgeValueClass = getEdgeValueClass(conf);
+        try {
+            return edgeValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createEdgeValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createEdgeValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed vertex message value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex message value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Create a user vertex message value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex message value
+     */
+    public static <M extends Writable> M createMessageValue(Configuration conf) {
+        Class<M> messageValueClass = getMessageValueClass(conf);
+        try {
+            return messageValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
new file mode 100644
index 0000000..3742afa
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameTupleUtils {
+
+    public static void flushTuple(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
+            throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(appender.getBuffer(), writer);
+            appender.reset(appender.getBuffer(), true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    public static void flushTuplesFinal(FrameTupleAppender appender, IFrameWriter writer) throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(appender.getBuffer(), writer);
+            appender.reset(appender.getBuffer(), true);
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
new file mode 100644
index 0000000..d599f34
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods to get type arguments to generic classes. Courtesy of Ian
+ * Robertson (overstock.com). Make sure to use with abstract generic classes,
+ * not interfaces.
+ */
+public class ReflectionUtils {
+    /**
+     * Do not instantiate.
+     */
+    private ReflectionUtils() {
+    }
+
+    /**
+     * Get the underlying class for a type, or null if the type is a variable
+     * type.
+     * 
+     * @param type
+     *            the type
+     * @return the underlying class
+     */
+    public static Class<?> getClass(Type type) {
+        if (type instanceof Class) {
+            return (Class<?>) type;
+        } else if (type instanceof ParameterizedType) {
+            return getClass(((ParameterizedType) type).getRawType());
+        } else if (type instanceof GenericArrayType) {
+            Type componentType = ((GenericArrayType) type).getGenericComponentType();
+            Class<?> componentClass = getClass(componentType);
+            if (componentClass != null) {
+                return Array.newInstance(componentClass, 0).getClass();
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Get the actual type arguments a child class has used to extend a generic
+     * base class.
+     * 
+     * @param <T>
+     *            Type to evaluate.
+     * @param baseClass
+     *            the base class
+     * @param childClass
+     *            the child class
+     * @return a list of the raw classes for the actual type arguments.
+     */
+    public static <T> List<Class<?>> getTypeArguments(Class<T> baseClass, Class<? extends T> childClass) {
+        Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
+        Type type = childClass;
+        // start walking up the inheritance hierarchy until we hit baseClass
+        while (!getClass(type).equals(baseClass)) {
+            if (type instanceof Class) {
+                // there is no useful information for us in raw types,
+                // so just keep going.
+                type = ((Class<?>) type).getGenericSuperclass();
+            } else {
+                ParameterizedType parameterizedType = (ParameterizedType) type;
+                Class<?> rawType = (Class<?>) parameterizedType.getRawType();
+
+                Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+                TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
+                for (int i = 0; i < actualTypeArguments.length; i++) {
+                    resolvedTypes.put(typeParameters[i], actualTypeArguments[i]);
+                }
+
+                if (!rawType.equals(baseClass)) {
+                    type = rawType.getGenericSuperclass();
+                }
+            }
+        }
+
+        // finally, for each actual type argument provided to baseClass,
+        // determine (if possible)
+        // the raw class for that type argument.
+        Type[] actualTypeArguments;
+        if (type instanceof Class) {
+            actualTypeArguments = ((Class<?>) type).getTypeParameters();
+        } else {
+            actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
+        }
+        List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
+        // resolve types by chasing down type variables.
+        for (Type baseType : actualTypeArguments) {
+            while (resolvedTypes.containsKey(baseType)) {
+                baseType = resolvedTypes.get(baseType);
+            }
+            typeArgumentsAsClasses.add(getClass(baseType));
+        }
+        return typeArgumentsAsClasses;
+    }
+
+    /**
+     * Try to directly set a (possibly private) field on an Object.
+     * 
+     * @param target
+     *            Target to set the field on.
+     * @param fieldname
+     *            Name of field.
+     * @param value
+     *            Value to set on target.
+     */
+    public static void setField(Object target, String fieldname, Object value) throws NoSuchFieldException,
+            IllegalAccessException {
+        Field field = findDeclaredField(target.getClass(), fieldname);
+        field.setAccessible(true);
+        field.set(target, value);
+    }
+
+    /**
+     * Find a declared field in a class or one of its super classes
+     * 
+     * @param inClass
+     *            Class to search for declared field.
+     * @param fieldname
+     *            Field name to search for
+     * @return Field or will throw.
+     * @throws NoSuchFieldException
+     *             When field not found.
+     */
+    private static Field findDeclaredField(Class<?> inClass, String fieldname) throws NoSuchFieldException {
+        while (!Object.class.equals(inClass)) {
+            for (Field field : inClass.getDeclaredFields()) {
+                if (field.getName().equalsIgnoreCase(fieldname)) {
+                    return field;
+                }
+            }
+            inClass = inClass.getSuperclass();
+        }
+        throw new NoSuchFieldException();
+    }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
new file mode 100644
index 0000000..3e917ec
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class SerDeUtils {
+
+    public static byte[] serialize(Writable object) throws IOException {
+        ByteArrayOutputStream bbos = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(bbos);
+        object.write(output);
+        return bbos.toByteArray();
+    }
+
+    public static void deserialize(Writable object, byte[] buffer) throws IOException {
+        ByteArrayInputStream bbis = new ByteArrayInputStream(buffer);
+        DataInput input = new DataInputStream(bbis);
+        object.readFields(input);
+    }
+
+    public static long readVLong(DataInput in) throws IOException {
+        int vLen = 0;
+        long value = 0L;
+        while (true) {
+            byte b = (byte) in.readByte();
+            ++vLen;
+            value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+            if ((b & 0x80) == 0) {
+                break;
+            }
+        }
+        return value;
+    }
+
+    public static void writeVLong(DataOutput out, long value) throws IOException {
+        long data = value;
+        do {
+            byte b = (byte) (data & 0x7f);
+            data >>= 7;
+            if (data != 0) {
+                b |= 0x80;
+            }
+            out.write(b);
+        } while (data != 0);
+    }
+
+    public static long readVLong(byte[] data, int start, int length) {
+        int vLen = 0;
+        long value = 0L;
+        while (true) {
+            byte b = (byte) data[start];
+            ++vLen;
+            value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+            if ((b & 0x80) == 0) {
+                break;
+            }
+            ++start;
+        }
+        if (vLen != length)
+            throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
+        return value;
+    }
+
+}