Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-api/pom.xml b/fullstack/pregelix/pregelix-api/pom.xml
new file mode 100644
index 0000000..0d87602
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>pregelix-api</artifactId>
+	<name>pregelix-api</name>
+
+	<parent>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<artifactId>pregelix</artifactId>
+		<version>0.2.2-SNAPSHOT</version>
+	</parent>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.7.2</version>
+				<configuration>
+					<forkMode>pertest</forkMode>
+					<argLine>-enableassertions -Xmx512m -Dfile.encoding=UTF-8
+						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+					<includes>
+						<include>**/*TestSuite.java</include>
+						<include>**/*Test.java</include>
+					</includes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>.</directory>
+							<includes>
+								<include>teststore*</include>
+								<include>edu*</include>
+								<include>actual*</include>
+								<include>build*</include>
+								<include>expect*</include>
+								<include>ClusterController*</include>
+								<include>edu.uci.*</include>
+							</includes>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<dependencies>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-dataflow-common</artifactId>
+			<version>0.2.2-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-core</artifactId>
+			<version>0.20.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
new file mode 100644
index 0000000..4af35fe
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * The Edge class, represent an outgoing edge inside an {@link Vertex} object.
+ * 
+ * @param <I>
+ *            Vertex index
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class Edge<I extends WritableComparable, E extends Writable> implements Writable, Configurable, Comparable {
+    /** Destination vertex id */
+    private I destVertexId = null;
+    /** Edge value */
+    private E edgeValue = null;
+    /** Configuration - Used to instantiate classes */
+    private Configuration conf = null;
+    /** Whether the edgeValue field is not null*/
+    private boolean hasEdgeValue = false;
+
+    /**
+     * Constructor for reflection
+     */
+    public Edge() {
+    }
+
+    /**
+     * Create the edge with final values
+     * 
+     * @param destVertexId
+     * @param edgeValue
+     */
+    public Edge(I destVertexId, E edgeValue) {
+        this.destVertexId = destVertexId;
+        this.edgeValue = edgeValue;
+        if (edgeValue != null)
+            hasEdgeValue = true;
+    }
+
+    /**
+     * Get the destination vertex index of this edge
+     * 
+     * @return Destination vertex index of this edge
+     */
+    public I getDestVertexId() {
+        return destVertexId;
+    }
+
+    /**
+     * set the destination vertex id
+     * 
+     * @param destVertexId
+     */
+    public void setDestVertexId(I destVertexId) {
+        this.destVertexId = destVertexId;
+    }
+
+    /**
+     * Get the edge value of the edge
+     * 
+     * @return Edge value of this edge
+     */
+    public E getEdgeValue() {
+        return edgeValue;
+    }
+
+    /**
+     * set the edge of value
+     * 
+     * @param edgeValue
+     */
+    public void setEdgeValue(E edgeValue) {
+        this.edgeValue = edgeValue;
+        if (edgeValue != null)
+            hasEdgeValue = true;
+    }
+
+    @Override
+    public String toString() {
+        return "(DestVertexIndex = " + destVertexId + ", edgeValue = " + edgeValue + ")";
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        if (destVertexId == null)
+            destVertexId = (I) BspUtils.createVertexIndex(getConf());
+        destVertexId.readFields(input);
+        hasEdgeValue = input.readBoolean();
+        if (hasEdgeValue) {
+            if (edgeValue == null)
+                edgeValue = (E) BspUtils.createEdgeValue(getConf());
+            edgeValue.readFields(input);
+        }
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        if (destVertexId == null) {
+            throw new IllegalStateException("write: Null destination vertex index");
+        }
+        destVertexId.write(output);
+        output.writeBoolean(hasEdgeValue);
+        if (hasEdgeValue)
+            edgeValue.write(output);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public boolean equals(Edge<I, E> edge) {
+        return this.destVertexId.equals(edge.getDestVertexId());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compareTo(Object o) {
+        Edge<I, E> edge = (Edge<I, E>) o;
+        return destVertexId.compareTo(edge.getDestVertexId());
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
new file mode 100644
index 0000000..cb27249
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
+ * </p>
+ * The global aggregation of vertices in a distributed cluster include two phase:
+ * 1. a local phase which aggregates vertice sent from a single machine and produces
+ * the partially aggregated state;
+ * 2. a final phase which aggregates all partially aggregated states
+ * 
+ * @param <I extends Writable> vertex identifier type
+ * @param <E extends Writable> vertex value type
+ * @param <E extends Writable> edge type
+ * @param <M extends Writable> message type
+ * @param <P extends Writable>
+ *        the type of the partial aggregate state
+ * @param <F extends Writable> the type of the final aggregate value
+ */
+
+@SuppressWarnings("rawtypes")
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+    /**
+     * initialize aggregator
+     */
+    public abstract void init();
+
+    /**
+     * step through all vertex at each slave partition
+     * 
+     * @param vertexIndex
+     * @param msg
+     * @throws IOException
+     */
+    public abstract void step(Vertex<I, V, E, M> v) throws HyracksDataException;
+
+    /**
+     * step through all intermediate aggregate result
+     * 
+     * @param partialResult
+     *            partial aggregate value
+     */
+    public abstract void step(P partialResult);
+
+    /**
+     * finish partial aggregate
+     * 
+     * @return the final aggregate value
+     */
+    public abstract P finishPartial();
+
+    /**
+     * finish final aggregate
+     * 
+     * @return the final aggregate value
+     */
+    public abstract F finishFinal();
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
new file mode 100644
index 0000000..e4f8ef9
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This is the abstract class to implement for combining of messages that are sent to the same vertex.
+ * </p>
+ * This is similar to the concept of Combiner in Hadoop. The combining of messages in a distributed
+ * cluster include two phase:
+ * 1. a local phase which combines messages sent from a single machine and produces
+ * the partially combined message;
+ * 2. a final phase which combines messages at each receiver machine after the repartitioning (shuffling)
+ * and produces the final combined message
+ * 
+ * @param <I extends Writable> vertex identifier
+ * @param <M extends Writable> message body type
+ * @param <P extends Writable>
+ *        the type of the partially combined messages
+ */
+@SuppressWarnings("rawtypes")
+public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+
+    /**
+     * initialize combiner
+     * 
+     * @param providedMsgList
+     *            the provided msg list for user implementation to update, which *should* be returned
+     *            by the finishFinal() method
+     */
+    public abstract void init(MsgList providedMsgList);
+
+    /**
+     * step call for local combiner
+     * 
+     * @param vertexIndex
+     *            the receiver vertex identifier
+     * @param msg
+     *            a single message body
+     * @throws HyracksDataException
+     */
+    public abstract void stepPartial(I vertexIndex, M msg) throws HyracksDataException;
+
+    /**
+     * step call for global combiner
+     * 
+     * @param vertexIndex
+     *            the receiver vertex identifier
+     * @param partialAggregate
+     *            the partial aggregate value
+     * @throws HyracksDataException
+     */
+    public abstract void stepFinal(I vertexIndex, P partialAggregate) throws HyracksDataException;
+
+    /**
+     * finish partial combiner
+     * 
+     * @return the intermediate combined message of type P
+     */
+    public abstract P finishPartial();
+
+    /**
+     * finish final combiner
+     * 
+     * @return the final message List
+     */
+    public abstract MsgList<M> finishFinal();
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
new file mode 100644
index 0000000..734b1af
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that allows the message class to be
+ * set prior to calling readFields().
+ * 
+ * @param <M>
+ *            message type
+ */
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+    /** Defining a layout version for a serializable class. */
+    private static final long serialVersionUID = 100L;
+
+    /**
+     * Default constructor.s
+     */
+    public MsgList() {
+        super();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setClass() {
+        setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
new file mode 100644
index 0000000..6856e9a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -0,0 +1,432 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * User applications should all inherit {@link Vertex}, and implement their own
+ * *compute* method.
+ * 
+ * @param <I>
+ *            Vertex identifier type
+ * @param <V>
+ *            Vertex value type
+ * @param <E>
+ *            Edge value type
+ * @param <M>
+ *            Message value type
+ */
+@SuppressWarnings("rawtypes")
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        implements Writable {
+    private static long superstep = 0;
+    /** Class-wide number of vertices */
+    private static long numVertices = -1;
+    /** Class-wide number of edges */
+    private static long numEdges = -1;
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** Map of destination vertices and their edge values */
+    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+    /** map context */
+    private static Mapper.Context context = null;
+    /** a delegate for hyracks stuff */
+    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+    /** this vertex is updated or not */
+    private boolean updated = false;
+    /** has outgoing messages */
+    private boolean hasMessage = false;
+
+    /**
+     * use object pool for re-using objects
+     */
+    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+    private List<M> msgPool = new ArrayList<M>();
+    private List<V> valuePool = new ArrayList<V>();
+    private int usedEdge = 0;
+    private int usedMessage = 0;
+    private int usedValue = 0;
+
+    /**
+     * The key method that users need to implement
+     * 
+     * @param msgIterator
+     *            an iterator of incoming messages
+     */
+    public abstract void compute(Iterator<M> msgIterator);
+
+    /**
+     * Add an edge for the vertex.
+     * 
+     * @param targetVertexId
+     * @param edgeValue
+     * @return successful or not
+     */
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        Edge<I, E> edge = this.allocateEdge();
+        edge.setDestVertexId(targetVertexId);
+        edge.setEdgeValue(edgeValue);
+        destEdgeList.add(edge);
+        return true;
+    }
+
+    /**
+     * Initialize a new vertex
+     * 
+     * @param vertexId
+     * @param vertexValue
+     * @param edges
+     * @param messages
+     */
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        destEdgeList.clear();
+        if (edges != null && !edges.isEmpty()) {
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
+
+    /**
+     * reset a vertex object: clear its internal states
+     */
+    public void reset() {
+        usedEdge = 0;
+        usedMessage = 0;
+        usedValue = 0;
+    }
+
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
+
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
+
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedEdge);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
+
+    /**
+     * Set the vertex id
+     * 
+     * @param vertexId
+     */
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+        delegate.setVertexId(vertexId);
+    }
+
+    /**
+     * Get the vertex id
+     * 
+     * @return vertex id
+     */
+    public final I getVertexId() {
+        return vertexId;
+    }
+
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
+
+    public static long getCurrentSuperstep() {
+        return superstep;
+    }
+
+    public final long getSuperstep() {
+        return superstep;
+    }
+
+    public final V getVertexValue() {
+        return vertexValue;
+    }
+
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+        this.updated = true;
+    }
+
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
+
+    public final long getNumVertices() {
+        return numVertices;
+    }
+
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
+
+    public final long getNumEdges() {
+        return numEdges;
+    }
+
+    /***
+     * Send a message to a specific vertex
+     * 
+     * @param id
+     *            the receiver vertex id
+     * @param msg
+     *            the message
+     */
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+        delegate.sendMsg(id, msg);
+        this.hasMessage = true;
+    }
+
+    /**
+     * Send a message to all direct outgoing neighbors
+     * 
+     * @param msg
+     *            the message
+     */
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (Edge<I, E> edge : destEdgeList) {
+            sendMsg(edge.getDestVertexId(), msg);
+        }
+    }
+
+    /**
+     * Vote to halt. Once all vertex vote to halt and no more messages, a
+     * Pregelix job will terminate.
+     */
+    public final void voteToHalt() {
+        halt = true;
+    }
+
+    /**
+     * @return the vertex is halted (true) or not (false)
+     */
+    public final boolean isHalted() {
+        return halt;
+    }
+
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        reset();
+        if (vertexId == null)
+            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        vertexId.readFields(in);
+        delegate.setVertexId(vertexId);
+        boolean hasVertexValue = in.readBoolean();
+        if (hasVertexValue) {
+            vertexValue = allocateValue();
+            vertexValue.readFields(in);
+            delegate.setVertex(this);
+        }
+        destEdgeList.clear();
+        long edgeMapSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < edgeMapSize; ++i) {
+            Edge<I, E> edge = allocateEdge();
+            edge.setConf(getContext().getConfiguration());
+            edge.readFields(in);
+            addEdge(edge);
+        }
+        msgList.clear();
+        long msgListSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < msgListSize; ++i) {
+            M msg = allocateMessage();
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+        updated = false;
+        hasMessage = false;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        SerDeUtils.writeVLong(out, destEdgeList.size());
+        for (Edge<I, E> edge : destEdgeList) {
+            edge.write(out);
+        }
+        SerDeUtils.writeVLong(out, msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
+
+    private boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        destEdgeList.add(edge);
+        return true;
+    }
+
+    /**
+     * Get the list of incoming messages
+     * 
+     * @return the list of messages
+     */
+    public List<M> getMsgList() {
+        return msgList;
+    }
+
+    /**
+     * Get outgoing edge list
+     * 
+     * @return a list of outgoing edges
+     */
+    public List<Edge<I, E>> getEdges() {
+        return this.destEdgeList;
+    }
+
+    public final Mapper<?, ?, ?, ?>.Context getContext() {
+        return context;
+    }
+
+    public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
+        Vertex.context = context;
+    }
+
+    @SuppressWarnings("unchecked")
+    public String toString() {
+        Collections.sort(destEdgeList);
+        StringBuffer edgeBuffer = new StringBuffer();
+        edgeBuffer.append("(");
+        for (Edge<I, E> edge : destEdgeList) {
+            edgeBuffer.append(edge.getDestVertexId()).append(",");
+        }
+        edgeBuffer.append(")");
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+    }
+
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
+
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
+
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
+
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
+
+    public boolean hasUpdate() {
+        return this.updated;
+    }
+
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
+
+    public int getNumOutEdges() {
+        return destEdgeList.size();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void sortEdges() {
+        Collections.sort((List) destEdgeList);
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
new file mode 100644
index 0000000..7267f30
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+
+@SuppressWarnings("rawtypes")
+class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private Vertex vertex = null;
+
+    /** message tuple builder */
+    private ArrayTupleBuilder message;
+    private IFrameWriter msgWriter;
+    private FrameTupleAppender appenderMsg;
+
+    /** alive tuple builder */
+    private ArrayTupleBuilder alive;
+    private IFrameWriter aliveWriter;
+    private FrameTupleAppender appenderAlive;
+
+    /** message list */
+    private MsgList dummyMessageList = new MsgList();
+    /** whether alive message should be pushed out */
+    private boolean pushAlive;
+
+    public VertexDelegate(Vertex vertex) {
+        this.vertex = vertex;
+    }
+
+    public void finishCompute() throws IOException {
+        // package alive info
+        if (pushAlive && !vertex.isHalted()) {
+            alive.reset();
+            DataOutput outputAlive = alive.getDataOutput();
+            vertexId.write(outputAlive);
+            alive.addFieldEndOffset();
+            dummyMessageList.write(outputAlive);
+            alive.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderAlive, alive, aliveWriter);
+        }
+    }
+
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+
+        /**
+         * send out message along message channel
+         */
+        try {
+            message.reset();
+            DataOutput outputMsg = message.getDataOutput();
+            id.write(outputMsg);
+            message.addFieldEndOffset();
+            msg.write(outputMsg);
+            message.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderMsg, message, msgWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public final void setVertex(Vertex vertex) {
+        this.vertex = vertex;
+    }
+
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+    }
+
+    public final void setOutputWriters(List<IFrameWriter> outputs) {
+        msgWriter = outputs.get(0);
+        if (outputs.size() > 1) {
+            aliveWriter = outputs.get(1);
+            pushAlive = true;
+        }
+    }
+
+    public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        appenderMsg = appenders.get(0);
+        if (appenders.size() > 1) {
+            appenderAlive = appenders.get(1);
+        }
+    }
+
+    public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        message = tbs.get(0);
+        if (tbs.size() > 1) {
+            alive = tbs.get(1);
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
new file mode 100644
index 0000000..7179737
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This InputSplit will not give any ordering or location data. It is used
+ * internally by BspInputFormat (which determines how many tasks to run the
+ * application on). Users should not use this directly.
+ */
+public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
+    private static final long serialVersionUID = 1L;
+    /** Number of splits */
+    private int numSplits = -1;
+    /** Split index */
+    private int splitIndex = -1;
+
+    public BasicGenInputSplit() {
+    }
+
+    public BasicGenInputSplit(int splitIndex, int numSplits) {
+        this.splitIndex = splitIndex;
+        this.numSplits = numSplits;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[] {};
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        splitIndex = in.readInt();
+        numSplits = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(splitIndex);
+        out.writeInt(numSplits);
+    }
+
+    public int getSplitIndex() {
+        return splitIndex;
+    }
+
+    public int getNumSplits() {
+        return numSplits;
+    }
+
+    @Override
+    public String toString() {
+        return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
new file mode 100644
index 0000000..98076fd
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Use this to load data for a BSP application. Note that the InputSplit must
+ * also implement Writable. The InputSplits will determine the partitioning of
+ * vertices across the mappers, so keep that in consideration when implementing
+ * getSplits().
+ * 
+ * @param <I>
+ *            Vertex id
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Logically split the vertices for a graph processing application.
+     * Each {@link InputSplit} is then assigned to a worker for processing.
+     * <p>
+     * <i>Note</i>: The split is a <i>logical</i> split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat also creates the {@link VertexReader} to read the {@link InputSplit}. Also, the number of workers is a hint given to the developer to try to intelligently determine how many splits to create (if this is adjustable) at runtime.
+     * 
+     * @param context
+     *            Context of the job
+     * @param numWorkers
+     *            Number of workers used for this job
+     * @return an array of {@link InputSplit}s for the job.
+     */
+    public abstract List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException,
+            InterruptedException;
+
+    /**
+     * Create a vertex reader for a given split. The framework will call {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+     * the split is used.
+     * 
+     * @param split
+     *            the split to be read
+     * @param context
+     *            the information about the task
+     * @return a new record reader
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context)
+            throws IOException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
new file mode 100644
index 0000000..6a761a6
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implement to output the graph after the computation. It is modeled directly
+ * after the Hadoop OutputFormat.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> {
+    /**
+     * Create a vertex writer for a given split. The framework will call {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+     * the split is used.
+     * 
+     * @param context
+     *            the information about the task
+     * @return a new vertex writer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+            InterruptedException;
+
+    /**
+     * Check for validity of the output-specification for the job. (Copied from
+     * Hadoop OutputFormat)
+     * <p>
+     * This is to validate the output specification for the job when it is a job is submitted. Typically checks that it does not already exist, throwing an exception when it already exists, so that output is not overwritten.
+     * </p>
+     * 
+     * @param context
+     *            information about the job
+     * @throws IOException
+     *             when output should not be attempted
+     */
+    public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
+
+    /**
+     * Get the output committer for this output format. This is responsible for
+     * ensuring the output is committed correctly. (Copied from Hadoop
+     * OutputFormat)
+     * 
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+            InterruptedException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
new file mode 100644
index 0000000..3e899b8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Analogous to {@link RecordReader} for vertices. Will read the vertices from
+ * an input split.
+ * 
+ * @param <I>
+ *            Vertex id
+ * @param <V>
+ *            Vertex data
+ * @param <E>
+ *            Edge data
+ * @param <M>
+ *            Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+    /**
+     * Use the input split and context t o setup reading the vertices.
+     * Guaranteed to be called prior to any other function.
+     * 
+     * @param inputSplit
+     *            Input split to be used for reading vertices.
+     * @param context
+     *            Context from the task.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException;
+
+    /**
+     * @return false iff there are no more vertices
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    boolean nextVertex() throws IOException, InterruptedException;
+
+    /**
+     * Get the current vertex.
+     * 
+     * @return the current vertex which has been read. nextVertex() should be
+     *         called first.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException;
+
+    /**
+     * Close this {@link VertexReader} to future operations.
+     * 
+     * @throws IOException
+     */
+    void close() throws IOException;
+
+    /**
+     * How much of the input has the {@link VertexReader} consumed i.e. has been
+     * processed by?
+     * 
+     * @return Progress from <code>0.0</code> to <code>1.0</code>.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    float getProgress() throws IOException, InterruptedException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
new file mode 100644
index 0000000..fcad020
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Implement to output a vertex range of the graph after the computation
+ * 
+ * @param <I>
+ *            Vertex id
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexWriter<I extends WritableComparable, V extends Writable, E extends Writable> {
+    /**
+     * Use the context to setup writing the vertices. Guaranteed to be called
+     * prior to any other function.
+     * 
+     * @param context
+     *            Context used to write the vertices.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void initialize(TaskAttemptContext context) throws IOException, InterruptedException;
+
+    /**
+     * Writes the next vertex and associated data
+     * 
+     * @param vertex
+     *            set the properties of this vertex
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException;
+
+    /**
+     * Close this {@link VertexWriter} to future operations.
+     * 
+     * @param context
+     *            the context of the task
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void close(TaskAttemptContext context) throws IOException, InterruptedException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
new file mode 100644
index 0000000..7e7825a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging. It simply generates
+ * some vertex data that can be consumed by test applications.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        // This is meaningless, the VertexReader will generate all the test
+        // data.
+        List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+        for (int i = 0; i < numWorkers; ++i) {
+            inputSplitList.add(new BasicGenInputSplit(i, numWorkers));
+        }
+        return inputSplitList;
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
new file mode 100644
index 0000000..370583b
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Used by GeneratedVertexInputFormat to read some generated data
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        implements VertexReader<I, V, E, M> {
+    /** Records read so far */
+    protected long recordsRead = 0;
+    /** Total records to read (on this split alone) */
+    protected long totalRecords = 0;
+    /** The input split from initialize(). */
+    protected BasicGenInputSplit inputSplit = null;
+    /** Reverse the id order? */
+    protected boolean reverseIdOrder;
+
+    protected Configuration configuration = null;
+
+    public static final String READER_VERTICES = "GeneratedVertexReader.reader_vertices";
+    public static final long DEFAULT_READER_VERTICES = 10;
+    public static final String REVERSE_ID_ORDER = "GeneratedVertexReader.reverseIdOrder";
+    public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+
+    public GeneratedVertexReader() {
+    }
+
+    @Override
+    final public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
+        configuration = context.getConfiguration();
+        totalRecords = configuration.getLong(GeneratedVertexReader.READER_VERTICES,
+                GeneratedVertexReader.DEFAULT_READER_VERTICES);
+        reverseIdOrder = configuration.getBoolean(GeneratedVertexReader.REVERSE_ID_ORDER,
+                GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+        this.inputSplit = (BasicGenInputSplit) inputSplit;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    final public float getProgress() throws IOException {
+        return recordsRead * 100.0f / totalRecords;
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
new file mode 100644
index 0000000..9fcb1c6
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
+    /** Uses the TextInputFormat to do everything */
+    protected TextInputFormat textInputFormat = new TextInputFormat();
+
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex input. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+            implements VertexReader<I, V, E, M> {
+        /** Internal line record reader */
+        private final RecordReader<LongWritable, Text> lineRecordReader;
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+
+        /**
+         * Initialize with the LineRecordReader.
+         * 
+         * @param lineRecordReader
+         *            Line record reader from TextInputFormat
+         */
+        public TextVertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+            this.lineRecordReader = lineRecordReader;
+        }
+
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            lineRecordReader.initialize(inputSplit, context);
+            this.context = context;
+        }
+
+        @Override
+        public void close() throws IOException {
+            lineRecordReader.close();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return lineRecordReader.getProgress();
+        }
+
+        /**
+         * Get the line record reader.
+         * 
+         * @return Record reader to be used for reading.
+         */
+        protected RecordReader<LongWritable, Text> getRecordReader() {
+            return lineRecordReader;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        protected TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+        // Ignore the hint of numWorkers here since we are using TextInputFormat
+        // to do this for us
+        return textInputFormat.getSplits(context);
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
new file mode 100644
index 0000000..355b9f8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+        extends VertexOutputFormat<I, V, E> {
+    /** Uses the TextOutputFormat to do everything */
+    protected TextOutputFormat<Text, Text> textOutputFormat = new TextOutputFormat<Text, Text>();
+
+    /**
+     * Abstract class to be implemented by the user based on their specific
+     * vertex output. Easiest to ignore the key value separator and only use key
+     * instead.
+     * 
+     * @param <I>
+     *            Vertex index value
+     * @param <V>
+     *            Vertex value
+     * @param <E>
+     *            Edge value
+     */
+    public static abstract class TextVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
+            implements VertexWriter<I, V, E> {
+        /** Context passed to initialize */
+        private TaskAttemptContext context;
+        /** Internal line record writer */
+        private final RecordWriter<Text, Text> lineRecordWriter;
+
+        /**
+         * Initialize with the LineRecordWriter.
+         * 
+         * @param lineRecordWriter
+         *            Line record writer from TextOutputFormat
+         */
+        public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            this.lineRecordWriter = lineRecordWriter;
+        }
+
+        @Override
+        public void initialize(TaskAttemptContext context) throws IOException {
+            this.context = context;
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+            lineRecordWriter.close(context);
+        }
+
+        /**
+         * Get the line record writer.
+         * 
+         * @return Record writer to be used for writing.
+         */
+        public RecordWriter<Text, Text> getRecordWriter() {
+            return lineRecordWriter;
+        }
+
+        /**
+         * Get the context.
+         * 
+         * @return Context passed to initialize.
+         */
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        textOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return textOutputFormat.getOutputCommitter(context);
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
new file mode 100644
index 0000000..6ef7e13
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+
+/**
+ * This class represents a Pregelix job.
+ */
+public class PregelixJob extends Job {
+    /** Vertex class - required */
+    public static final String VERTEX_CLASS = "pregelix.vertexClass";
+    /** VertexInputFormat class - required */
+    public static final String VERTEX_INPUT_FORMAT_CLASS = "pregelix.vertexInputFormatClass";
+    /** VertexOutputFormat class - optional */
+    public static final String VERTEX_OUTPUT_FORMAT_CLASS = "pregelix.vertexOutputFormatClass";
+    /** Vertex combiner class - optional */
+    public static final String Message_COMBINER_CLASS = "pregelix.combinerClass";
+    /** Global aggregator class - optional */
+    public static final String GLOBAL_AGGREGATOR_CLASS = "pregelix.aggregatorClass";
+    /** Vertex resolver class - optional */
+    public static final String VERTEX_RESOLVER_CLASS = "pregelix.vertexResolverClass";
+    /** Vertex index class */
+    public static final String VERTEX_INDEX_CLASS = "pregelix.vertexIndexClass";
+    /** Vertex value class */
+    public static final String VERTEX_VALUE_CLASS = "pregelix.vertexValueClass";
+    /** Edge value class */
+    public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
+    /** Message value class */
+    public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
+    /** Partial combiner value class */
+    public static final String PARTIAL_COMBINE_VALUE_CLASS = "pregelix.partialCombinedValueClass";
+    /** Partial aggregate value class */
+    public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
+    /** Final aggregate value class */
+    public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
+    /** num of vertices */
+    public static final String NUM_VERTICE = "pregelix.numVertices";
+    /** num of edges */
+    public static final String NUM_EDGES = "pregelix.numEdges";
+    /** job id */
+    public static final String JOB_ID = "pregelix.jobid";
+
+    /**
+     * Constructor that will instantiate the configuration
+     * 
+     * @param jobName
+     *            User-defined job name
+     * @throws IOException
+     */
+    public PregelixJob(String jobName) throws IOException {
+        super(new Configuration(), jobName);
+    }
+
+    /**
+     * Constructor.
+     * 
+     * @param conf
+     *            User-defined configuration
+     * @param jobName
+     *            User-defined job name
+     * @throws IOException
+     */
+    public PregelixJob(Configuration conf, String jobName) throws IOException {
+        super(conf, jobName);
+    }
+
+    /**
+     * Set the vertex class (required)
+     * 
+     * @param vertexClass
+     *            Runs vertex computation
+     */
+    final public void setVertexClass(Class<?> vertexClass) {
+        getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+    }
+
+    /**
+     * Set the vertex input format class (required)
+     * 
+     * @param vertexInputFormatClass
+     *            Determines how graph is input
+     */
+    final public void setVertexInputFormatClass(Class<?> vertexInputFormatClass) {
+        getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, vertexInputFormatClass, VertexInputFormat.class);
+    }
+
+    /**
+     * Set the vertex output format class (optional)
+     * 
+     * @param vertexOutputFormatClass
+     *            Determines how graph is output
+     */
+    final public void setVertexOutputFormatClass(Class<?> vertexOutputFormatClass) {
+        getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, vertexOutputFormatClass, VertexOutputFormat.class);
+    }
+
+    /**
+     * Set the vertex combiner class (optional)
+     * 
+     * @param vertexCombinerClass
+     *            Determines how vertex messages are combined
+     */
+    final public void setMessageCombinerClass(Class<?> vertexCombinerClass) {
+        getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
+    }
+
+    /**
+     * Set the global aggregator class (optional)
+     * 
+     * @param vertexCombinerClass
+     *            Determines how vertex messages are combined
+     */
+    final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
+        getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
+    }
+
+    /**
+     * Set the job Id
+     * 
+     * @param vertexCombinerClass
+     *            Determines how vertex messages are combined
+     */
+    final public void setJobId(String jobId) {
+        getConfiguration().set(JOB_ID, jobId);
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
new file mode 100644
index 0000000..d2ba28d
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+/**
+ * A Writable for ListArray containing instances of a class.
+ */
+public abstract class ArrayListWritable<M extends Writable> extends ArrayList<M> implements Writable, Configurable {
+    /** Used for instantiation */
+    private Class<M> refClass = null;
+    /** Defining a layout version for a serializable class. */
+    private static final long serialVersionUID = 1L;
+    /** Configuration */
+    private Configuration conf;
+    /** content object pool */
+    private List<M> pool = new ArrayList<M>();
+    /** how many instance in the pool has been used */
+    private int used = 0;
+    /** intermediate buffer for copy data element */
+    private final ArrayBackedValueStorage intermediateBuffer = new ArrayBackedValueStorage();
+    /** intermediate data output */
+    private final DataOutput intermediateOutput = intermediateBuffer.getDataOutput();
+    /** input stream */
+    private final ResetableByteArrayInputStream inputStream = new ResetableByteArrayInputStream();
+    /** data input */
+    private final DataInput dataInput = new DataInputStream(inputStream);
+
+    /**
+     * Using the default constructor requires that the user implement
+     * setClass(), guaranteed to be invoked prior to instantiation in
+     * readFields()
+     */
+    public ArrayListWritable() {
+    }
+
+    /**
+     * clear all the elements
+     */
+    public void clearElements() {
+        this.used = 0;
+        this.clear();
+    }
+
+    /**
+     * Add all elements from another list
+     * 
+     * @param list
+     *            the list of M
+     * @return true if successful, else false
+     */
+    public boolean addAllElements(List<M> list) {
+        for (int i = 0; i < list.size(); i++) {
+            addElement(list.get(i));
+        }
+        return true;
+    }
+
+    /**
+     * Add an element
+     * 
+     * @param element
+     *            M
+     * @return true if successful, else false
+     */
+    public boolean addElement(M element) {
+        try {
+            intermediateBuffer.reset();
+            element.write(intermediateOutput);
+            inputStream.setByteArray(intermediateBuffer.getByteArray(), 0);
+            M value = allocateValue();
+            value.readFields(dataInput);
+            add(value);
+            return true;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * This constructor allows setting the refClass during construction.
+     * 
+     * @param refClass
+     *            internal type class
+     */
+    public ArrayListWritable(Class<M> refClass) {
+        super();
+        this.refClass = refClass;
+    }
+
+    /**
+     * This is a one-time operation to set the class type
+     * 
+     * @param refClass
+     *            internal type class
+     */
+    public void setClass(Class<M> refClass) {
+        if (this.refClass != null) {
+            throw new RuntimeException("setClass: refClass is already set to " + this.refClass.getName());
+        }
+        this.refClass = refClass;
+    }
+
+    /**
+     * Subclasses must set the class type appropriately and can use
+     * setClass(Class<M> refClass) to do it.
+     */
+    public abstract void setClass();
+
+    public void readFields(DataInput in) throws IOException {
+        if (this.refClass == null) {
+            setClass();
+        }
+        used = 0;
+        this.clear();
+        int numValues = in.readInt(); // read number of values
+        if (numValues > 100) {
+            System.out.println("num values: " + numValues);
+        }
+        for (int i = 0; i < numValues; i++) {
+            M value = allocateValue();
+            value.readFields(in); // read a value
+            add(value); // store it in values
+        }
+    }
+
+    public void write(DataOutput out) throws IOException {
+        int numValues = size();
+        if (numValues > 100) {
+            System.out.println("write num values: " + numValues);
+        }
+        out.writeInt(numValues); // write number of values
+        for (int i = 0; i < numValues; i++) {
+            get(i).write(out);
+        }
+    }
+
+    public final Configuration getConf() {
+        return conf;
+    }
+
+    public final void setConf(Configuration conf) {
+        this.conf = conf;
+        if (this.refClass == null) {
+            setClass();
+        }
+    }
+
+    private M allocateValue() {
+        if (used >= pool.size()) {
+            M value = ReflectionUtils.newInstance(refClass, conf);
+            pool.add(value);
+            used++;
+            return value;
+        } else {
+            M value = pool.get(used);
+            used++;
+            return value;
+        }
+    }
+
+    public void reset(ArrayIterator<M> iterator) {
+        iterator.reset(this);
+    }
+
+    public static class ArrayIterator<M> implements Iterator<M> {
+
+        private int pos = 0;
+        private List<M> list;
+
+        private void reset(List<M> list) {
+            this.list = list;
+            pos = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return pos < list.size();
+        }
+
+        @Override
+        public M next() {
+            M item = list.get(pos);
+            pos++;
+            return item;
+        }
+
+        @Override
+        public void remove() {
+            throw new IllegalStateException("should not be called");
+        }
+
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
new file mode 100644
index 0000000..7c4853f
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Help to use the configuration to get the appropriate classes or instantiate
+ * them.
+ */
+public class BspUtils {
+    private static Configuration defaultConf = null;
+
+    public static void setDefaultConfiguration(Configuration conf) {
+        defaultConf = conf;
+    }
+
+    /**
+     * Get the user's subclassed {@link VertexInputFormat}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex input format class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+            Configuration conf) {
+        return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
+                null, VertexInputFormat.class);
+    }
+
+    /**
+     * Create a user vertex input format class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex input format class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+            Configuration conf) {
+        Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
+        VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+        return inputFormat;
+    }
+
+    /**
+     * Get the user's subclassed {@link VertexOutputFormat}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex output format class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable> Class<? extends VertexOutputFormat<I, V, E>> getVertexOutputFormatClass(
+            Configuration conf) {
+        return (Class<? extends VertexOutputFormat<I, V, E>>) conf.getClass(PregelixJob.VERTEX_OUTPUT_FORMAT_CLASS,
+                null, VertexOutputFormat.class);
+    }
+
+    /**
+     * Create a user vertex output format class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex output format class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable> VertexOutputFormat<I, V, E> createVertexOutputFormat(
+            Configuration conf) {
+        Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass = getVertexOutputFormatClass(conf);
+        return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+    }
+
+    /**
+     * Get the user's subclassed {@link MessageCombiner}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex combiner class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+            Configuration conf) {
+        return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
+                DefaultMessageCombiner.class, MessageCombiner.class);
+    }
+
+    /**
+     * Get the user's subclassed {@link GlobalAggregator}.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex combiner class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+            Configuration conf) {
+        return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
+                GlobalCountAggregator.class, GlobalAggregator.class);
+    }
+
+    public static String getJobId(Configuration conf) {
+        return conf.get(PregelixJob.JOB_ID);
+    }
+
+    /**
+     * Create a user vertex combiner class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex combiner class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+            Configuration conf) {
+        Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
+        return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+    }
+
+    /**
+     * Create a global aggregator class
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex combiner class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+            Configuration conf) {
+        Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
+        return ReflectionUtils.newInstance(globalAggregatorClass, conf);
+    }
+
+    /**
+     * Get the user's subclassed Vertex.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex class
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+            Configuration conf) {
+        return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
+    }
+
+    /**
+     * Create a user vertex
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+            Configuration conf) {
+        Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+        Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
+        return vertex;
+    }
+
+    /**
+     * Get the user's subclassed vertex index class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex index class
+     */
+    @SuppressWarnings("unchecked")
+    public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
+    }
+
+    /**
+     * Create a user vertex index
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex index
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable> I createVertexIndex(Configuration conf) {
+        Class<I> vertexClass = getVertexIndexClass(conf);
+        try {
+            return vertexClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexIndex: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexIndex: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed vertex value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <V extends Writable> Class<V> getVertexValueClass(Configuration conf) {
+        return (Class<V>) conf.getClass(PregelixJob.VERTEX_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Create a user vertex value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex value
+     */
+    public static <V extends Writable> V createVertexValue(Configuration conf) {
+        Class<V> vertexValueClass = getVertexValueClass(conf);
+        try {
+            return vertexValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed edge value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex edge value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <E extends Writable> Class<E> getEdgeValueClass(Configuration conf) {
+        return (Class<E>) conf.getClass(PregelixJob.EDGE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Create a user edge value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user edge value
+     */
+    public static <E extends Writable> E createEdgeValue(Configuration conf) {
+        Class<E> edgeValueClass = getEdgeValueClass(conf);
+        try {
+            return edgeValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createEdgeValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createEdgeValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Get the user's subclassed vertex message value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's vertex message value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Get the user's subclassed global aggregator's partial aggregate value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's global aggregate value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Get the user's subclassed combiner's partial combine value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's global aggregate value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Get the user's subclassed global aggregator's global value class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return User's global aggregate value class
+     */
+    @SuppressWarnings("unchecked")
+    public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
+        if (conf == null)
+            conf = defaultConf;
+        return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
+    }
+
+    /**
+     * Create a user vertex message value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user vertex message value
+     */
+    public static <M extends Writable> M createMessageValue(Configuration conf) {
+        Class<M> messageValueClass = getMessageValueClass(conf);
+        try {
+            return messageValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Create a user partial aggregate value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    public static <M extends Writable> M createPartialAggregateValue(Configuration conf) {
+        Class<M> aggregateValueClass = getPartialAggregateValueClass(conf);
+        try {
+            return aggregateValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Create a user partial combine value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    @SuppressWarnings("rawtypes")
+    public static <M extends Writable> M createPartialCombineValue(Configuration conf) {
+        Class<M> aggregateValueClass = getPartialCombineValueClass(conf);
+        try {
+            M instance = aggregateValueClass.newInstance();
+            if (instance instanceof MsgList) {
+                // set conf for msg list, if the value type is msglist
+                ((MsgList) instance).setConf(conf);
+            }
+            return instance;
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+
+    /**
+     * Create a user aggregate value
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    public static <M extends Writable> M createFinalAggregateValue(Configuration conf) {
+        Class<M> aggregateValueClass = getFinalAggregateValueClass(conf);
+        try {
+            return aggregateValueClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+        }
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
new file mode 100644
index 0000000..1468431
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+        MessageCombiner<I, M, MsgList> {
+    private MsgList<M> msgList;
+
+    @Override
+    public void init(MsgList providedMsgList) {
+        this.msgList = providedMsgList;
+        this.msgList.clearElements();
+    }
+
+    @Override
+    public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
+        msgList.addElement(msg);
+    }
+
+    @Override
+    public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
+        msgList.addAllElements(partialAggregate);
+    }
+
+    @Override
+    public MsgList finishPartial() {
+        return msgList;
+    }
+
+    @Override
+    public MsgList<M> finishFinal() {
+        return msgList;
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
new file mode 100644
index 0000000..402249e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameTupleUtils {
+
+    public static void flushTuple(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
+            throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(appender.getBuffer(), writer);
+            appender.reset(appender.getBuffer(), true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    public static void flushTuplesFinal(FrameTupleAppender appender, IFrameWriter writer) throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(appender.getBuffer(), writer);
+            appender.reset(appender.getBuffer(), true);
+        }
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
new file mode 100644
index 0000000..1c1fa92
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+@SuppressWarnings("rawtypes")
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+        extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
+
+    private LongWritable state = new LongWritable(0);
+
+    @Override
+    public void init() {
+        state.set(0);
+    }
+
+    @Override
+    public void step(Vertex<I, V, E, M> v) throws HyracksDataException {
+        state.set(state.get() + 1);
+    }
+
+    @Override
+    public void step(LongWritable partialResult) {
+        state.set(state.get() + partialResult.get());
+    }
+
+    @Override
+    public LongWritable finishPartial() {
+        return state;
+    }
+
+    @Override
+    public LongWritable finishFinal() {
+        return state;
+    }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
new file mode 100644
index 0000000..1366df1
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods to get type arguments to generic classes. Courtesy of Ian
+ * Robertson (overstock.com). Make sure to use with abstract generic classes,
+ * not interfaces.
+ */
+public class ReflectionUtils {
+    /**
+     * Do not instantiate.
+     */
+    private ReflectionUtils() {
+    }
+
+    /**
+     * Get the underlying class for a type, or null if the type is a variable
+     * type.
+     * 
+     * @param type
+     *            the type
+     * @return the underlying class
+     */
+    public static Class<?> getClass(Type type) {
+        if (type instanceof Class) {
+            return (Class<?>) type;
+        } else if (type instanceof ParameterizedType) {
+            return getClass(((ParameterizedType) type).getRawType());
+        } else if (type instanceof GenericArrayType) {
+            Type componentType = ((GenericArrayType) type).getGenericComponentType();
+            Class<?> componentClass = getClass(componentType);
+            if (componentClass != null) {
+                return Array.newInstance(componentClass, 0).getClass();
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Get the actual type arguments a child class has used to extend a generic
+     * base class.
+     * 
+     * @param <T>
+     *            Type to evaluate.
+     * @param baseClass
+     *            the base class
+     * @param childClass
+     *            the child class
+     * @return a list of the raw classes for the actual type arguments.
+     */
+    public static <T> List<Class<?>> getTypeArguments(Class<T> baseClass, Class<? extends T> childClass) {
+        Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
+        Type type = childClass;
+        // start walking up the inheritance hierarchy until we hit baseClass
+        while (!getClass(type).equals(baseClass)) {
+            if (type instanceof Class) {
+                // there is no useful information for us in raw types,
+                // so just keep going.
+                type = ((Class<?>) type).getGenericSuperclass();
+            } else {
+                ParameterizedType parameterizedType = (ParameterizedType) type;
+                Class<?> rawType = (Class<?>) parameterizedType.getRawType();
+
+                Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+                TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
+                for (int i = 0; i < actualTypeArguments.length; i++) {
+                    resolvedTypes.put(typeParameters[i], actualTypeArguments[i]);
+                }
+
+                if (!rawType.equals(baseClass)) {
+                    type = rawType.getGenericSuperclass();
+                }
+            }
+        }
+
+        // finally, for each actual type argument provided to baseClass,
+        // determine (if possible)
+        // the raw class for that type argument.
+        Type[] actualTypeArguments;
+        if (type instanceof Class) {
+            actualTypeArguments = ((Class<?>) type).getTypeParameters();
+        } else {
+            actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
+        }
+        List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
+        // resolve types by chasing down type variables.
+        for (Type baseType : actualTypeArguments) {
+            while (resolvedTypes.containsKey(baseType)) {
+                baseType = resolvedTypes.get(baseType);
+            }
+            typeArgumentsAsClasses.add(getClass(baseType));
+        }
+        return typeArgumentsAsClasses;
+    }
+
+    /**
+     * Try to directly set a (possibly private) field on an Object.
+     * 
+     * @param target
+     *            Target to set the field on.
+     * @param fieldname
+     *            Name of field.
+     * @param value
+     *            Value to set on target.
+     */
+    public static void setField(Object target, String fieldname, Object value) throws NoSuchFieldException,
+            IllegalAccessException {
+        Field field = findDeclaredField(target.getClass(), fieldname);
+        field.setAccessible(true);
+        field.set(target, value);
+    }
+
+    /**
+     * Find a declared field in a class or one of its super classes
+     * 
+     * @param inClass
+     *            Class to search for declared field.
+     * @param fieldname
+     *            Field name to search for
+     * @return Field or will throw.
+     * @throws NoSuchFieldException
+     *             When field not found.
+     */
+    private static Field findDeclaredField(Class<?> inClass, String fieldname) throws NoSuchFieldException {
+        while (!Object.class.equals(inClass)) {
+            for (Field field : inClass.getDeclaredFields()) {
+                if (field.getName().equalsIgnoreCase(fieldname)) {
+                    return field;
+                }
+            }
+            inClass = inClass.getSuperclass();
+        }
+        throw new NoSuchFieldException();
+    }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
new file mode 100755
index 0000000..5702642
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends InputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayInputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public int read() {
+        int remaining = data.length - position;
+        int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+        }
+        return value;
+    }
+
+    @Override
+    public int read(byte[] bytes, int offset, int length) {
+        int remaining = data.length - position;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+                    + length + " position: " + position);
+        }
+        if (remaining == 0) {
+            return -1;
+        }
+        int l = Math.min(length, remaining);
+        System.arraycopy(data, position, bytes, offset, l);
+        position += l;
+        return l;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
new file mode 100755
index 0000000..307a3ae
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends OutputStream {
+    private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+    private byte[] data;
+    private int position;
+
+    public ResetableByteArrayOutputStream() {
+    }
+
+    public void setByteArray(byte[] data, int position) {
+        this.data = data;
+        this.position = position;
+    }
+
+    @Override
+    public void write(int b) {
+        int remaining = data.length - position;
+        if (position + 1 > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        data[position] = (byte) b;
+        position++;
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+                    + position);
+        }
+        if (position + length > data.length - 1)
+            throw new IndexOutOfBoundsException();
+        System.arraycopy(bytes, offset, data, position, length);
+        position += length;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
new file mode 100644
index 0000000..32c21ac
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class SerDeUtils {
+
+    public static byte[] serialize(Writable object) throws IOException {
+        ByteArrayOutputStream bbos = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(bbos);
+        object.write(output);
+        return bbos.toByteArray();
+    }
+
+    public static void deserialize(Writable object, byte[] buffer) throws IOException {
+        ByteArrayInputStream bbis = new ByteArrayInputStream(buffer);
+        DataInput input = new DataInputStream(bbis);
+        object.readFields(input);
+    }
+
+    public static long readVLong(DataInput in) throws IOException {
+        int vLen = 0;
+        long value = 0L;
+        while (true) {
+            byte b = (byte) in.readByte();
+            ++vLen;
+            value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+            if ((b & 0x80) == 0) {
+                break;
+            }
+        }
+        return value;
+    }
+
+    public static void writeVLong(DataOutput out, long value) throws IOException {
+        long data = value;
+        do {
+            byte b = (byte) (data & 0x7f);
+            data >>= 7;
+            if (data != 0) {
+                b |= 0x80;
+            }
+            out.write(b);
+        } while (data != 0);
+    }
+
+    public static long readVLong(byte[] data, int start, int length) {
+        int vLen = 0;
+        long value = 0L;
+        while (true) {
+            byte b = (byte) data[start];
+            ++vLen;
+            value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+            if ((b & 0x80) == 0) {
+                break;
+            }
+            ++start;
+        }
+        if (vLen != length)
+            throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
+        return value;
+    }
+
+}