Merged fullstack_staging branch into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-api/pom.xml b/fullstack/pregelix/pregelix-api/pom.xml
new file mode 100644
index 0000000..0d87602
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pregelix-api</artifactId>
+ <name>pregelix-api</name>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx512m -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ <include>edu.uci.*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
new file mode 100644
index 0000000..4af35fe
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * The Edge class, represent an outgoing edge inside an {@link Vertex} object.
+ *
+ * @param <I>
+ * Vertex index
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class Edge<I extends WritableComparable, E extends Writable> implements Writable, Configurable, Comparable {
+ /** Destination vertex id */
+ private I destVertexId = null;
+ /** Edge value */
+ private E edgeValue = null;
+ /** Configuration - Used to instantiate classes */
+ private Configuration conf = null;
+ /** Whether the edgeValue field is not null*/
+ private boolean hasEdgeValue = false;
+
+ /**
+ * Constructor for reflection
+ */
+ public Edge() {
+ }
+
+ /**
+ * Create the edge with final values
+ *
+ * @param destVertexId
+ * @param edgeValue
+ */
+ public Edge(I destVertexId, E edgeValue) {
+ this.destVertexId = destVertexId;
+ this.edgeValue = edgeValue;
+ if (edgeValue != null)
+ hasEdgeValue = true;
+ }
+
+ /**
+ * Get the destination vertex index of this edge
+ *
+ * @return Destination vertex index of this edge
+ */
+ public I getDestVertexId() {
+ return destVertexId;
+ }
+
+ /**
+ * set the destination vertex id
+ *
+ * @param destVertexId
+ */
+ public void setDestVertexId(I destVertexId) {
+ this.destVertexId = destVertexId;
+ }
+
+ /**
+ * Get the edge value of the edge
+ *
+ * @return Edge value of this edge
+ */
+ public E getEdgeValue() {
+ return edgeValue;
+ }
+
+ /**
+ * set the edge of value
+ *
+ * @param edgeValue
+ */
+ public void setEdgeValue(E edgeValue) {
+ this.edgeValue = edgeValue;
+ if (edgeValue != null)
+ hasEdgeValue = true;
+ }
+
+ @Override
+ public String toString() {
+ return "(DestVertexIndex = " + destVertexId + ", edgeValue = " + edgeValue + ")";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ if (destVertexId == null)
+ destVertexId = (I) BspUtils.createVertexIndex(getConf());
+ destVertexId.readFields(input);
+ hasEdgeValue = input.readBoolean();
+ if (hasEdgeValue) {
+ if (edgeValue == null)
+ edgeValue = (E) BspUtils.createEdgeValue(getConf());
+ edgeValue.readFields(input);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (destVertexId == null) {
+ throw new IllegalStateException("write: Null destination vertex index");
+ }
+ destVertexId.write(output);
+ output.writeBoolean(hasEdgeValue);
+ if (hasEdgeValue)
+ edgeValue.write(output);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public boolean equals(Edge<I, E> edge) {
+ return this.destVertexId.equals(edge.getDestVertexId());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(Object o) {
+ Edge<I, E> edge = (Edge<I, E>) o;
+ return destVertexId.compareTo(edge.getDestVertexId());
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
new file mode 100644
index 0000000..cb27249
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
+ * </p>
+ * The global aggregation of vertices in a distributed cluster include two phase:
+ * 1. a local phase which aggregates vertice sent from a single machine and produces
+ * the partially aggregated state;
+ * 2. a final phase which aggregates all partially aggregated states
+ *
+ * @param <I extends Writable> vertex identifier type
+ * @param <E extends Writable> vertex value type
+ * @param <E extends Writable> edge type
+ * @param <M extends Writable> message type
+ * @param <P extends Writable>
+ * the type of the partial aggregate state
+ * @param <F extends Writable> the type of the final aggregate value
+ */
+
+@SuppressWarnings("rawtypes")
+public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
+ /**
+ * initialize aggregator
+ */
+ public abstract void init();
+
+ /**
+ * step through all vertex at each slave partition
+ *
+ * @param vertexIndex
+ * @param msg
+ * @throws IOException
+ */
+ public abstract void step(Vertex<I, V, E, M> v) throws HyracksDataException;
+
+ /**
+ * step through all intermediate aggregate result
+ *
+ * @param partialResult
+ * partial aggregate value
+ */
+ public abstract void step(P partialResult);
+
+ /**
+ * finish partial aggregate
+ *
+ * @return the final aggregate value
+ */
+ public abstract P finishPartial();
+
+ /**
+ * finish final aggregate
+ *
+ * @return the final aggregate value
+ */
+ public abstract F finishFinal();
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
new file mode 100644
index 0000000..e4f8ef9
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This is the abstract class to implement for combining of messages that are sent to the same vertex.
+ * </p>
+ * This is similar to the concept of Combiner in Hadoop. The combining of messages in a distributed
+ * cluster include two phase:
+ * 1. a local phase which combines messages sent from a single machine and produces
+ * the partially combined message;
+ * 2. a final phase which combines messages at each receiver machine after the repartitioning (shuffling)
+ * and produces the final combined message
+ *
+ * @param <I extends Writable> vertex identifier
+ * @param <M extends Writable> message body type
+ * @param <P extends Writable>
+ * the type of the partially combined messages
+ */
+@SuppressWarnings("rawtypes")
+public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
+
+ /**
+ * initialize combiner
+ *
+ * @param providedMsgList
+ * the provided msg list for user implementation to update, which *should* be returned
+ * by the finishFinal() method
+ */
+ public abstract void init(MsgList providedMsgList);
+
+ /**
+ * step call for local combiner
+ *
+ * @param vertexIndex
+ * the receiver vertex identifier
+ * @param msg
+ * a single message body
+ * @throws HyracksDataException
+ */
+ public abstract void stepPartial(I vertexIndex, M msg) throws HyracksDataException;
+
+ /**
+ * step call for global combiner
+ *
+ * @param vertexIndex
+ * the receiver vertex identifier
+ * @param partialAggregate
+ * the partial aggregate value
+ * @throws HyracksDataException
+ */
+ public abstract void stepFinal(I vertexIndex, P partialAggregate) throws HyracksDataException;
+
+ /**
+ * finish partial combiner
+ *
+ * @return the intermediate combined message of type P
+ */
+ public abstract P finishPartial();
+
+ /**
+ * finish final combiner
+ *
+ * @return the final message List
+ */
+ public abstract MsgList<M> finishFinal();
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
new file mode 100644
index 0000000..734b1af
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that allows the message class to be
+ * set prior to calling readFields().
+ *
+ * @param <M>
+ * message type
+ */
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 100L;
+
+ /**
+ * Default constructor.s
+ */
+ public MsgList() {
+ super();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
new file mode 100644
index 0000000..6856e9a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -0,0 +1,432 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * User applications should all inherit {@link Vertex}, and implement their own
+ * *compute* method.
+ *
+ * @param <I>
+ * Vertex identifier type
+ * @param <V>
+ * Vertex value type
+ * @param <E>
+ * Edge value type
+ * @param <M>
+ * Message value type
+ */
+@SuppressWarnings("rawtypes")
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements Writable {
+ private static long superstep = 0;
+ /** Class-wide number of vertices */
+ private static long numVertices = -1;
+ /** Class-wide number of edges */
+ private static long numEdges = -1;
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** Map of destination vertices and their edge values */
+ private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+ /** map context */
+ private static Mapper.Context context = null;
+ /** a delegate for hyracks stuff */
+ private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+ /** this vertex is updated or not */
+ private boolean updated = false;
+ /** has outgoing messages */
+ private boolean hasMessage = false;
+
+ /**
+ * use object pool for re-using objects
+ */
+ private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+ private List<M> msgPool = new ArrayList<M>();
+ private List<V> valuePool = new ArrayList<V>();
+ private int usedEdge = 0;
+ private int usedMessage = 0;
+ private int usedValue = 0;
+
+ /**
+ * The key method that users need to implement
+ *
+ * @param msgIterator
+ * an iterator of incoming messages
+ */
+ public abstract void compute(Iterator<M> msgIterator);
+
+ /**
+ * Add an edge for the vertex.
+ *
+ * @param targetVertexId
+ * @param edgeValue
+ * @return successful or not
+ */
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ Edge<I, E> edge = this.allocateEdge();
+ edge.setDestVertexId(targetVertexId);
+ edge.setEdgeValue(edgeValue);
+ destEdgeList.add(edge);
+ return true;
+ }
+
+ /**
+ * Initialize a new vertex
+ *
+ * @param vertexId
+ * @param vertexValue
+ * @param edges
+ * @param messages
+ */
+ public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ destEdgeList.clear();
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+ }
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
+
+ /**
+ * reset a vertex object: clear its internal states
+ */
+ public void reset() {
+ usedEdge = 0;
+ usedMessage = 0;
+ usedValue = 0;
+ }
+
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
+
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
+
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedEdge);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
+
+ /**
+ * Set the vertex id
+ *
+ * @param vertexId
+ */
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ delegate.setVertexId(vertexId);
+ }
+
+ /**
+ * Get the vertex id
+ *
+ * @return vertex id
+ */
+ public final I getVertexId() {
+ return vertexId;
+ }
+
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
+
+ public static long getCurrentSuperstep() {
+ return superstep;
+ }
+
+ public final long getSuperstep() {
+ return superstep;
+ }
+
+ public final V getVertexValue() {
+ return vertexValue;
+ }
+
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ this.updated = true;
+ }
+
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
+
+ public final long getNumVertices() {
+ return numVertices;
+ }
+
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
+
+ public final long getNumEdges() {
+ return numEdges;
+ }
+
+ /***
+ * Send a message to a specific vertex
+ *
+ * @param id
+ * the receiver vertex id
+ * @param msg
+ * the message
+ */
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+ delegate.sendMsg(id, msg);
+ this.hasMessage = true;
+ }
+
+ /**
+ * Send a message to all direct outgoing neighbors
+ *
+ * @param msg
+ * the message
+ */
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeList) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
+
+ /**
+ * Vote to halt. Once all vertex vote to halt and no more messages, a
+ * Pregelix job will terminate.
+ */
+ public final void voteToHalt() {
+ halt = true;
+ }
+
+ /**
+ * @return the vertex is halted (true) or not (false)
+ */
+ public final boolean isHalted() {
+ return halt;
+ }
+
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ reset();
+ if (vertexId == null)
+ vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+ vertexId.readFields(in);
+ delegate.setVertexId(vertexId);
+ boolean hasVertexValue = in.readBoolean();
+ if (hasVertexValue) {
+ vertexValue = allocateValue();
+ vertexValue.readFields(in);
+ delegate.setVertex(this);
+ }
+ destEdgeList.clear();
+ long edgeMapSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = allocateEdge();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(in);
+ addEdge(edge);
+ }
+ msgList.clear();
+ long msgListSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = allocateMessage();
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ updated = false;
+ hasMessage = false;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ SerDeUtils.writeVLong(out, destEdgeList.size());
+ for (Edge<I, E> edge : destEdgeList) {
+ edge.write(out);
+ }
+ SerDeUtils.writeVLong(out, msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
+
+ private boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ destEdgeList.add(edge);
+ return true;
+ }
+
+ /**
+ * Get the list of incoming messages
+ *
+ * @return the list of messages
+ */
+ public List<M> getMsgList() {
+ return msgList;
+ }
+
+ /**
+ * Get outgoing edge list
+ *
+ * @return a list of outgoing edges
+ */
+ public List<Edge<I, E>> getEdges() {
+ return this.destEdgeList;
+ }
+
+ public final Mapper<?, ?, ?, ?>.Context getContext() {
+ return context;
+ }
+
+ public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ Vertex.context = context;
+ }
+
+ @SuppressWarnings("unchecked")
+ public String toString() {
+ Collections.sort(destEdgeList);
+ StringBuffer edgeBuffer = new StringBuffer();
+ edgeBuffer.append("(");
+ for (Edge<I, E> edge : destEdgeList) {
+ edgeBuffer.append(edge.getDestVertexId()).append(",");
+ }
+ edgeBuffer.append(")");
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+ }
+
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
+
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
+
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
+
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
+
+ public boolean hasUpdate() {
+ return this.updated;
+ }
+
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
+
+ public int getNumOutEdges() {
+ return destEdgeList.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void sortEdges() {
+ Collections.sort((List) destEdgeList);
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
new file mode 100644
index 0000000..7267f30
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+
+@SuppressWarnings("rawtypes")
+class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private Vertex vertex = null;
+
+ /** message tuple builder */
+ private ArrayTupleBuilder message;
+ private IFrameWriter msgWriter;
+ private FrameTupleAppender appenderMsg;
+
+ /** alive tuple builder */
+ private ArrayTupleBuilder alive;
+ private IFrameWriter aliveWriter;
+ private FrameTupleAppender appenderAlive;
+
+ /** message list */
+ private MsgList dummyMessageList = new MsgList();
+ /** whether alive message should be pushed out */
+ private boolean pushAlive;
+
+ public VertexDelegate(Vertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public void finishCompute() throws IOException {
+ // package alive info
+ if (pushAlive && !vertex.isHalted()) {
+ alive.reset();
+ DataOutput outputAlive = alive.getDataOutput();
+ vertexId.write(outputAlive);
+ alive.addFieldEndOffset();
+ dummyMessageList.write(outputAlive);
+ alive.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderAlive, alive, aliveWriter);
+ }
+ }
+
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+
+ /**
+ * send out message along message channel
+ */
+ try {
+ message.reset();
+ DataOutput outputMsg = message.getDataOutput();
+ id.write(outputMsg);
+ message.addFieldEndOffset();
+ msg.write(outputMsg);
+ message.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderMsg, message, msgWriter);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public final void setVertex(Vertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ public final void setOutputWriters(List<IFrameWriter> outputs) {
+ msgWriter = outputs.get(0);
+ if (outputs.size() > 1) {
+ aliveWriter = outputs.get(1);
+ pushAlive = true;
+ }
+ }
+
+ public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ appenderMsg = appenders.get(0);
+ if (appenders.size() > 1) {
+ appenderAlive = appenders.get(1);
+ }
+ }
+
+ public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ message = tbs.get(0);
+ if (tbs.size() > 1) {
+ alive = tbs.get(1);
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
new file mode 100644
index 0000000..7179737
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This InputSplit will not give any ordering or location data. It is used
+ * internally by BspInputFormat (which determines how many tasks to run the
+ * application on). Users should not use this directly.
+ */
+public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
+ private static final long serialVersionUID = 1L;
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
+
+ public BasicGenInputSplit() {
+ }
+
+ public BasicGenInputSplit(int splitIndex, int numSplits) {
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[] {};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
+
+ public int getSplitIndex() {
+ return splitIndex;
+ }
+
+ public int getNumSplits() {
+ return numSplits;
+ }
+
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
new file mode 100644
index 0000000..98076fd
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Use this to load data for a BSP application. Note that the InputSplit must
+ * also implement Writable. The InputSplits will determine the partitioning of
+ * vertices across the mappers, so keep that in consideration when implementing
+ * getSplits().
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Logically split the vertices for a graph processing application.
+ * Each {@link InputSplit} is then assigned to a worker for processing.
+ * <p>
+ * <i>Note</i>: The split is a <i>logical</i> split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <i><input-file-path, start, offset></i> tuple. The InputFormat also creates the {@link VertexReader} to read the {@link InputSplit}. Also, the number of workers is a hint given to the developer to try to intelligently determine how many splits to create (if this is adjustable) at runtime.
+ *
+ * @param context
+ * Context of the job
+ * @param numWorkers
+ * Number of workers used for this job
+ * @return an array of {@link InputSplit}s for the job.
+ */
+ public abstract List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException,
+ InterruptedException;
+
+ /**
+ * Create a vertex reader for a given split. The framework will call {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split
+ * the split to be read
+ * @param context
+ * the information about the task
+ * @return a new record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
new file mode 100644
index 0000000..6a761a6
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implement to output the graph after the computation. It is modeled directly
+ * after the Hadoop OutputFormat.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> {
+ /**
+ * Create a vertex writer for a given split. The framework will call {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param context
+ * the information about the task
+ * @return a new vertex writer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException;
+
+ /**
+ * Check for validity of the output-specification for the job. (Copied from
+ * Hadoop OutputFormat)
+ * <p>
+ * This is to validate the output specification for the job when it is a job is submitted. Typically checks that it does not already exist, throwing an exception when it already exists, so that output is not overwritten.
+ * </p>
+ *
+ * @param context
+ * information about the job
+ * @throws IOException
+ * when output should not be attempted
+ */
+ public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
+
+ /**
+ * Get the output committer for this output format. This is responsible for
+ * ensuring the output is committed correctly. (Copied from Hadoop
+ * OutputFormat)
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
new file mode 100644
index 0000000..3e899b8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Analogous to {@link RecordReader} for vertices. Will read the vertices from
+ * an input split.
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex data
+ * @param <E>
+ * Edge data
+ * @param <M>
+ * Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Use the input split and context t o setup reading the vertices.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit
+ * Input split to be used for reading vertices.
+ * @param context
+ * Context from the task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException;
+
+ /**
+ * @return false iff there are no more vertices
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean nextVertex() throws IOException, InterruptedException;
+
+ /**
+ * Get the current vertex.
+ *
+ * @return the current vertex which has been read. nextVertex() should be
+ * called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException;
+
+ /**
+ * Close this {@link VertexReader} to future operations.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link VertexReader} consumed i.e. has been
+ * processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ float getProgress() throws IOException, InterruptedException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
new file mode 100644
index 0000000..fcad020
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Implement to output a vertex range of the graph after the computation
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexWriter<I extends WritableComparable, V extends Writable, E extends Writable> {
+ /**
+ * Use the context to setup writing the vertices. Guaranteed to be called
+ * prior to any other function.
+ *
+ * @param context
+ * Context used to write the vertices.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void initialize(TaskAttemptContext context) throws IOException, InterruptedException;
+
+ /**
+ * Writes the next vertex and associated data
+ *
+ * @param vertex
+ * set the properties of this vertex
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException;
+
+ /**
+ * Close this {@link VertexWriter} to future operations.
+ *
+ * @param context
+ * the context of the task
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void close(TaskAttemptContext context) throws IOException, InterruptedException;
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
new file mode 100644
index 0000000..7e7825a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging. It simply generates
+ * some vertex data that can be consumed by test applications.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ // This is meaningless, the VertexReader will generate all the test
+ // data.
+ List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+ for (int i = 0; i < numWorkers; ++i) {
+ inputSplitList.add(new BasicGenInputSplit(i, numWorkers));
+ }
+ return inputSplitList;
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
new file mode 100644
index 0000000..370583b
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Used by GeneratedVertexInputFormat to read some generated data
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Records read so far */
+ protected long recordsRead = 0;
+ /** Total records to read (on this split alone) */
+ protected long totalRecords = 0;
+ /** The input split from initialize(). */
+ protected BasicGenInputSplit inputSplit = null;
+ /** Reverse the id order? */
+ protected boolean reverseIdOrder;
+
+ protected Configuration configuration = null;
+
+ public static final String READER_VERTICES = "GeneratedVertexReader.reader_vertices";
+ public static final long DEFAULT_READER_VERTICES = 10;
+ public static final String REVERSE_ID_ORDER = "GeneratedVertexReader.reverseIdOrder";
+ public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+
+ public GeneratedVertexReader() {
+ }
+
+ @Override
+ final public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
+ configuration = context.getConfiguration();
+ totalRecords = configuration.getLong(GeneratedVertexReader.READER_VERTICES,
+ GeneratedVertexReader.DEFAULT_READER_VERTICES);
+ reverseIdOrder = configuration.getBoolean(GeneratedVertexReader.REVERSE_ID_ORDER,
+ GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+ this.inputSplit = (BasicGenInputSplit) inputSplit;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ final public float getProgress() throws IOException {
+ return recordsRead * 100.0f / totalRecords;
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
new file mode 100644
index 0000000..9fcb1c6
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Uses the TextInputFormat to do everything */
+ protected TextInputFormat textInputFormat = new TextInputFormat();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Internal line record reader */
+ private final RecordReader<LongWritable, Text> lineRecordReader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the LineRecordReader.
+ *
+ * @param lineRecordReader
+ * Line record reader from TextInputFormat
+ */
+ public TextVertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ this.lineRecordReader = lineRecordReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lineRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return lineRecordReader.getProgress();
+ }
+
+ /**
+ * Get the line record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<LongWritable, Text> getRecordReader() {
+ return lineRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ // Ignore the hint of numWorkers here since we are using TextInputFormat
+ // to do this for us
+ return textInputFormat.getSplits(context);
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
new file mode 100644
index 0000000..355b9f8
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+ /** Uses the TextOutputFormat to do everything */
+ protected TextOutputFormat<Text, Text> textOutputFormat = new TextOutputFormat<Text, Text>();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex output. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class TextVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
+ implements VertexWriter<I, V, E> {
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+ /** Internal line record writer */
+ private final RecordWriter<Text, Text> lineRecordWriter;
+
+ /**
+ * Initialize with the LineRecordWriter.
+ *
+ * @param lineRecordWriter
+ * Line record writer from TextOutputFormat
+ */
+ public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ this.lineRecordWriter = lineRecordWriter;
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException {
+ this.context = context;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ lineRecordWriter.close(context);
+ }
+
+ /**
+ * Get the line record writer.
+ *
+ * @return Record writer to be used for writing.
+ */
+ public RecordWriter<Text, Text> getRecordWriter() {
+ return lineRecordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ textOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return textOutputFormat.getOutputCommitter(context);
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
new file mode 100644
index 0000000..6ef7e13
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+
+/**
+ * This class represents a Pregelix job.
+ */
+public class PregelixJob extends Job {
+ /** Vertex class - required */
+ public static final String VERTEX_CLASS = "pregelix.vertexClass";
+ /** VertexInputFormat class - required */
+ public static final String VERTEX_INPUT_FORMAT_CLASS = "pregelix.vertexInputFormatClass";
+ /** VertexOutputFormat class - optional */
+ public static final String VERTEX_OUTPUT_FORMAT_CLASS = "pregelix.vertexOutputFormatClass";
+ /** Vertex combiner class - optional */
+ public static final String Message_COMBINER_CLASS = "pregelix.combinerClass";
+ /** Global aggregator class - optional */
+ public static final String GLOBAL_AGGREGATOR_CLASS = "pregelix.aggregatorClass";
+ /** Vertex resolver class - optional */
+ public static final String VERTEX_RESOLVER_CLASS = "pregelix.vertexResolverClass";
+ /** Vertex index class */
+ public static final String VERTEX_INDEX_CLASS = "pregelix.vertexIndexClass";
+ /** Vertex value class */
+ public static final String VERTEX_VALUE_CLASS = "pregelix.vertexValueClass";
+ /** Edge value class */
+ public static final String EDGE_VALUE_CLASS = "pregelix.edgeValueClass";
+ /** Message value class */
+ public static final String MESSAGE_VALUE_CLASS = "pregelix.messageValueClass";
+ /** Partial combiner value class */
+ public static final String PARTIAL_COMBINE_VALUE_CLASS = "pregelix.partialCombinedValueClass";
+ /** Partial aggregate value class */
+ public static final String PARTIAL_AGGREGATE_VALUE_CLASS = "pregelix.partialAggregateValueClass";
+ /** Final aggregate value class */
+ public static final String FINAL_AGGREGATE_VALUE_CLASS = "pregelix.finalAggregateValueClass";
+ /** num of vertices */
+ public static final String NUM_VERTICE = "pregelix.numVertices";
+ /** num of edges */
+ public static final String NUM_EDGES = "pregelix.numEdges";
+ /** job id */
+ public static final String JOB_ID = "pregelix.jobid";
+
+ /**
+ * Constructor that will instantiate the configuration
+ *
+ * @param jobName
+ * User-defined job name
+ * @throws IOException
+ */
+ public PregelixJob(String jobName) throws IOException {
+ super(new Configuration(), jobName);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param conf
+ * User-defined configuration
+ * @param jobName
+ * User-defined job name
+ * @throws IOException
+ */
+ public PregelixJob(Configuration conf, String jobName) throws IOException {
+ super(conf, jobName);
+ }
+
+ /**
+ * Set the vertex class (required)
+ *
+ * @param vertexClass
+ * Runs vertex computation
+ */
+ final public void setVertexClass(Class<?> vertexClass) {
+ getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+ }
+
+ /**
+ * Set the vertex input format class (required)
+ *
+ * @param vertexInputFormatClass
+ * Determines how graph is input
+ */
+ final public void setVertexInputFormatClass(Class<?> vertexInputFormatClass) {
+ getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, vertexInputFormatClass, VertexInputFormat.class);
+ }
+
+ /**
+ * Set the vertex output format class (optional)
+ *
+ * @param vertexOutputFormatClass
+ * Determines how graph is output
+ */
+ final public void setVertexOutputFormatClass(Class<?> vertexOutputFormatClass) {
+ getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, vertexOutputFormatClass, VertexOutputFormat.class);
+ }
+
+ /**
+ * Set the vertex combiner class (optional)
+ *
+ * @param vertexCombinerClass
+ * Determines how vertex messages are combined
+ */
+ final public void setMessageCombinerClass(Class<?> vertexCombinerClass) {
+ getConfiguration().setClass(Message_COMBINER_CLASS, vertexCombinerClass, MessageCombiner.class);
+ }
+
+ /**
+ * Set the global aggregator class (optional)
+ *
+ * @param vertexCombinerClass
+ * Determines how vertex messages are combined
+ */
+ final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
+ getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
+ }
+
+ /**
+ * Set the job Id
+ *
+ * @param vertexCombinerClass
+ * Determines how vertex messages are combined
+ */
+ final public void setJobId(String jobId) {
+ getConfiguration().set(JOB_ID, jobId);
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
new file mode 100644
index 0000000..d2ba28d
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+/**
+ * A Writable for ListArray containing instances of a class.
+ */
+public abstract class ArrayListWritable<M extends Writable> extends ArrayList<M> implements Writable, Configurable {
+ /** Used for instantiation */
+ private Class<M> refClass = null;
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 1L;
+ /** Configuration */
+ private Configuration conf;
+ /** content object pool */
+ private List<M> pool = new ArrayList<M>();
+ /** how many instance in the pool has been used */
+ private int used = 0;
+ /** intermediate buffer for copy data element */
+ private final ArrayBackedValueStorage intermediateBuffer = new ArrayBackedValueStorage();
+ /** intermediate data output */
+ private final DataOutput intermediateOutput = intermediateBuffer.getDataOutput();
+ /** input stream */
+ private final ResetableByteArrayInputStream inputStream = new ResetableByteArrayInputStream();
+ /** data input */
+ private final DataInput dataInput = new DataInputStream(inputStream);
+
+ /**
+ * Using the default constructor requires that the user implement
+ * setClass(), guaranteed to be invoked prior to instantiation in
+ * readFields()
+ */
+ public ArrayListWritable() {
+ }
+
+ /**
+ * clear all the elements
+ */
+ public void clearElements() {
+ this.used = 0;
+ this.clear();
+ }
+
+ /**
+ * Add all elements from another list
+ *
+ * @param list
+ * the list of M
+ * @return true if successful, else false
+ */
+ public boolean addAllElements(List<M> list) {
+ for (int i = 0; i < list.size(); i++) {
+ addElement(list.get(i));
+ }
+ return true;
+ }
+
+ /**
+ * Add an element
+ *
+ * @param element
+ * M
+ * @return true if successful, else false
+ */
+ public boolean addElement(M element) {
+ try {
+ intermediateBuffer.reset();
+ element.write(intermediateOutput);
+ inputStream.setByteArray(intermediateBuffer.getByteArray(), 0);
+ M value = allocateValue();
+ value.readFields(dataInput);
+ add(value);
+ return true;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * This constructor allows setting the refClass during construction.
+ *
+ * @param refClass
+ * internal type class
+ */
+ public ArrayListWritable(Class<M> refClass) {
+ super();
+ this.refClass = refClass;
+ }
+
+ /**
+ * This is a one-time operation to set the class type
+ *
+ * @param refClass
+ * internal type class
+ */
+ public void setClass(Class<M> refClass) {
+ if (this.refClass != null) {
+ throw new RuntimeException("setClass: refClass is already set to " + this.refClass.getName());
+ }
+ this.refClass = refClass;
+ }
+
+ /**
+ * Subclasses must set the class type appropriately and can use
+ * setClass(Class<M> refClass) to do it.
+ */
+ public abstract void setClass();
+
+ public void readFields(DataInput in) throws IOException {
+ if (this.refClass == null) {
+ setClass();
+ }
+ used = 0;
+ this.clear();
+ int numValues = in.readInt(); // read number of values
+ if (numValues > 100) {
+ System.out.println("num values: " + numValues);
+ }
+ for (int i = 0; i < numValues; i++) {
+ M value = allocateValue();
+ value.readFields(in); // read a value
+ add(value); // store it in values
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ int numValues = size();
+ if (numValues > 100) {
+ System.out.println("write num values: " + numValues);
+ }
+ out.writeInt(numValues); // write number of values
+ for (int i = 0; i < numValues; i++) {
+ get(i).write(out);
+ }
+ }
+
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ if (this.refClass == null) {
+ setClass();
+ }
+ }
+
+ private M allocateValue() {
+ if (used >= pool.size()) {
+ M value = ReflectionUtils.newInstance(refClass, conf);
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ M value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+
+ public void reset(ArrayIterator<M> iterator) {
+ iterator.reset(this);
+ }
+
+ public static class ArrayIterator<M> implements Iterator<M> {
+
+ private int pos = 0;
+ private List<M> list;
+
+ private void reset(List<M> list) {
+ this.list = list;
+ pos = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return pos < list.size();
+ }
+
+ @Override
+ public M next() {
+ M item = list.get(pos);
+ pos++;
+ return item;
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalStateException("should not be called");
+ }
+
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
new file mode 100644
index 0000000..7c4853f
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Help to use the configuration to get the appropriate classes or instantiate
+ * them.
+ */
+public class BspUtils {
+ private static Configuration defaultConf = null;
+
+ public static void setDefaultConfiguration(Configuration conf) {
+ defaultConf = conf;
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexInputFormat}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex input format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+ Configuration conf) {
+ return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
+ null, VertexInputFormat.class);
+ }
+
+ /**
+ * Create a user vertex input format class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex input format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+ Configuration conf) {
+ Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
+ VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+ return inputFormat;
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexOutputFormat}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex output format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable> Class<? extends VertexOutputFormat<I, V, E>> getVertexOutputFormatClass(
+ Configuration conf) {
+ return (Class<? extends VertexOutputFormat<I, V, E>>) conf.getClass(PregelixJob.VERTEX_OUTPUT_FORMAT_CLASS,
+ null, VertexOutputFormat.class);
+ }
+
+ /**
+ * Create a user vertex output format class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex output format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable> VertexOutputFormat<I, V, E> createVertexOutputFormat(
+ Configuration conf) {
+ Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass = getVertexOutputFormatClass(conf);
+ return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed {@link MessageCombiner}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex combiner class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, M extends Writable, P extends Writable> Class<? extends MessageCombiner<I, M, P>> getMessageCombinerClass(
+ Configuration conf) {
+ return (Class<? extends MessageCombiner<I, M, P>>) conf.getClass(PregelixJob.Message_COMBINER_CLASS,
+ DefaultMessageCombiner.class, MessageCombiner.class);
+ }
+
+ /**
+ * Get the user's subclassed {@link GlobalAggregator}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex combiner class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> Class<? extends GlobalAggregator<I, V, E, M, P, F>> getGlobalAggregatorClass(
+ Configuration conf) {
+ return (Class<? extends GlobalAggregator<I, V, E, M, P, F>>) conf.getClass(PregelixJob.GLOBAL_AGGREGATOR_CLASS,
+ GlobalCountAggregator.class, GlobalAggregator.class);
+ }
+
+ public static String getJobId(Configuration conf) {
+ return conf.get(PregelixJob.JOB_ID);
+ }
+
+ /**
+ * Create a user vertex combiner class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, M extends Writable, P extends Writable> MessageCombiner<I, M, P> createMessageCombiner(
+ Configuration conf) {
+ Class<? extends MessageCombiner<I, M, P>> vertexCombinerClass = getMessageCombinerClass(conf);
+ return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+ }
+
+ /**
+ * Create a global aggregator class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> GlobalAggregator<I, V, E, M, P, F> createGlobalAggregator(
+ Configuration conf) {
+ Class<? extends GlobalAggregator<I, V, E, M, P, F>> globalAggregatorClass = getGlobalAggregatorClass(conf);
+ return ReflectionUtils.newInstance(globalAggregatorClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed Vertex.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+ Configuration conf) {
+ return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
+ }
+
+ /**
+ * Create a user vertex
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+ Configuration conf) {
+ Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+ Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
+ return vertex;
+ }
+
+ /**
+ * Get the user's subclassed vertex index class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex index class
+ */
+ @SuppressWarnings("unchecked")
+ public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
+ }
+
+ /**
+ * Create a user vertex index
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex index
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable> I createVertexIndex(Configuration conf) {
+ Class<I> vertexClass = getVertexIndexClass(conf);
+ try {
+ return vertexClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexIndex: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexIndex: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends Writable> Class<V> getVertexValueClass(Configuration conf) {
+ return (Class<V>) conf.getClass(PregelixJob.VERTEX_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Create a user vertex value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex value
+ */
+ public static <V extends Writable> V createVertexValue(Configuration conf) {
+ Class<V> vertexValueClass = getVertexValueClass(conf);
+ try {
+ return vertexValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed edge value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex edge value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <E extends Writable> Class<E> getEdgeValueClass(Configuration conf) {
+ return (Class<E>) conf.getClass(PregelixJob.EDGE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Create a user edge value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user edge value
+ */
+ public static <E extends Writable> E createEdgeValue(Configuration conf) {
+ Class<E> edgeValueClass = getEdgeValueClass(conf);
+ try {
+ return edgeValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createEdgeValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createEdgeValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex message value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex message value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Get the user's subclassed global aggregator's partial aggregate value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's global aggregate value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getPartialAggregateValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Get the user's subclassed combiner's partial combine value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's global aggregate value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getPartialCombineValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Get the user's subclassed global aggregator's global value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's global aggregate value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getFinalAggregateValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Create a user vertex message value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex message value
+ */
+ public static <M extends Writable> M createMessageValue(Configuration conf) {
+ Class<M> messageValueClass = getMessageValueClass(conf);
+ try {
+ return messageValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Create a user partial aggregate value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static <M extends Writable> M createPartialAggregateValue(Configuration conf) {
+ Class<M> aggregateValueClass = getPartialAggregateValueClass(conf);
+ try {
+ return aggregateValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Create a user partial combine value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ @SuppressWarnings("rawtypes")
+ public static <M extends Writable> M createPartialCombineValue(Configuration conf) {
+ Class<M> aggregateValueClass = getPartialCombineValueClass(conf);
+ try {
+ M instance = aggregateValueClass.newInstance();
+ if (instance instanceof MsgList) {
+ // set conf for msg list, if the value type is msglist
+ ((MsgList) instance).setConf(conf);
+ }
+ return instance;
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Create a user aggregate value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user aggregate value
+ */
+ public static <M extends Writable> M createFinalAggregateValue(Configuration conf) {
+ Class<M> aggregateValueClass = getFinalAggregateValueClass(conf);
+ try {
+ return aggregateValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
new file mode 100644
index 0000000..1468431
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class DefaultMessageCombiner<I extends WritableComparable, M extends Writable> extends
+ MessageCombiner<I, M, MsgList> {
+ private MsgList<M> msgList;
+
+ @Override
+ public void init(MsgList providedMsgList) {
+ this.msgList = providedMsgList;
+ this.msgList.clearElements();
+ }
+
+ @Override
+ public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
+ msgList.addElement(msg);
+ }
+
+ @Override
+ public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
+ msgList.addAllElements(partialAggregate);
+ }
+
+ @Override
+ public MsgList finishPartial() {
+ return msgList;
+ }
+
+ @Override
+ public MsgList<M> finishFinal() {
+ return msgList;
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
new file mode 100644
index 0000000..402249e
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameTupleUtils {
+
+ public static void flushTuple(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
+ throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public static void flushTuplesFinal(FrameTupleAppender appender, IFrameWriter writer) throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ }
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
new file mode 100644
index 0000000..1c1fa92
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/GlobalCountAggregator.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+@SuppressWarnings("rawtypes")
+public class GlobalCountAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends GlobalAggregator<I, V, E, M, LongWritable, LongWritable> {
+
+ private LongWritable state = new LongWritable(0);
+
+ @Override
+ public void init() {
+ state.set(0);
+ }
+
+ @Override
+ public void step(Vertex<I, V, E, M> v) throws HyracksDataException {
+ state.set(state.get() + 1);
+ }
+
+ @Override
+ public void step(LongWritable partialResult) {
+ state.set(state.get() + partialResult.get());
+ }
+
+ @Override
+ public LongWritable finishPartial() {
+ return state;
+ }
+
+ @Override
+ public LongWritable finishFinal() {
+ return state;
+ }
+
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
new file mode 100644
index 0000000..1366df1
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods to get type arguments to generic classes. Courtesy of Ian
+ * Robertson (overstock.com). Make sure to use with abstract generic classes,
+ * not interfaces.
+ */
+public class ReflectionUtils {
+ /**
+ * Do not instantiate.
+ */
+ private ReflectionUtils() {
+ }
+
+ /**
+ * Get the underlying class for a type, or null if the type is a variable
+ * type.
+ *
+ * @param type
+ * the type
+ * @return the underlying class
+ */
+ public static Class<?> getClass(Type type) {
+ if (type instanceof Class) {
+ return (Class<?>) type;
+ } else if (type instanceof ParameterizedType) {
+ return getClass(((ParameterizedType) type).getRawType());
+ } else if (type instanceof GenericArrayType) {
+ Type componentType = ((GenericArrayType) type).getGenericComponentType();
+ Class<?> componentClass = getClass(componentType);
+ if (componentClass != null) {
+ return Array.newInstance(componentClass, 0).getClass();
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the actual type arguments a child class has used to extend a generic
+ * base class.
+ *
+ * @param <T>
+ * Type to evaluate.
+ * @param baseClass
+ * the base class
+ * @param childClass
+ * the child class
+ * @return a list of the raw classes for the actual type arguments.
+ */
+ public static <T> List<Class<?>> getTypeArguments(Class<T> baseClass, Class<? extends T> childClass) {
+ Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
+ Type type = childClass;
+ // start walking up the inheritance hierarchy until we hit baseClass
+ while (!getClass(type).equals(baseClass)) {
+ if (type instanceof Class) {
+ // there is no useful information for us in raw types,
+ // so just keep going.
+ type = ((Class<?>) type).getGenericSuperclass();
+ } else {
+ ParameterizedType parameterizedType = (ParameterizedType) type;
+ Class<?> rawType = (Class<?>) parameterizedType.getRawType();
+
+ Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+ TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
+ for (int i = 0; i < actualTypeArguments.length; i++) {
+ resolvedTypes.put(typeParameters[i], actualTypeArguments[i]);
+ }
+
+ if (!rawType.equals(baseClass)) {
+ type = rawType.getGenericSuperclass();
+ }
+ }
+ }
+
+ // finally, for each actual type argument provided to baseClass,
+ // determine (if possible)
+ // the raw class for that type argument.
+ Type[] actualTypeArguments;
+ if (type instanceof Class) {
+ actualTypeArguments = ((Class<?>) type).getTypeParameters();
+ } else {
+ actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
+ }
+ List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
+ // resolve types by chasing down type variables.
+ for (Type baseType : actualTypeArguments) {
+ while (resolvedTypes.containsKey(baseType)) {
+ baseType = resolvedTypes.get(baseType);
+ }
+ typeArgumentsAsClasses.add(getClass(baseType));
+ }
+ return typeArgumentsAsClasses;
+ }
+
+ /**
+ * Try to directly set a (possibly private) field on an Object.
+ *
+ * @param target
+ * Target to set the field on.
+ * @param fieldname
+ * Name of field.
+ * @param value
+ * Value to set on target.
+ */
+ public static void setField(Object target, String fieldname, Object value) throws NoSuchFieldException,
+ IllegalAccessException {
+ Field field = findDeclaredField(target.getClass(), fieldname);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ /**
+ * Find a declared field in a class or one of its super classes
+ *
+ * @param inClass
+ * Class to search for declared field.
+ * @param fieldname
+ * Field name to search for
+ * @return Field or will throw.
+ * @throws NoSuchFieldException
+ * When field not found.
+ */
+ private static Field findDeclaredField(Class<?> inClass, String fieldname) throws NoSuchFieldException {
+ while (!Object.class.equals(inClass)) {
+ for (Field field : inClass.getDeclaredFields()) {
+ if (field.getName().equalsIgnoreCase(fieldname)) {
+ return field;
+ }
+ }
+ inClass = inClass.getSuperclass();
+ }
+ throw new NoSuchFieldException();
+ }
+}
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
new file mode 100755
index 0000000..5702642
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayInputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayInputStream extends InputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayInputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayInputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public int read() {
+ int remaining = data.length - position;
+ int value = remaining > 0 ? (data[position++] & 0xff) : -1;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
+ }
+ return value;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) {
+ int remaining = data.length - position;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
+ + length + " position: " + position);
+ }
+ if (remaining == 0) {
+ return -1;
+ }
+ int l = Math.min(length, remaining);
+ System.arraycopy(data, position, bytes, offset, l);
+ position += l;
+ return l;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
new file mode 100755
index 0000000..307a3ae
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ResetableByteArrayOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResetableByteArrayOutputStream extends OutputStream {
+ private static final Logger LOGGER = Logger.getLogger(ResetableByteArrayOutputStream.class.getName());
+
+ private byte[] data;
+ private int position;
+
+ public ResetableByteArrayOutputStream() {
+ }
+
+ public void setByteArray(byte[] data, int position) {
+ this.data = data;
+ this.position = position;
+ }
+
+ @Override
+ public void write(int b) {
+ int remaining = data.length - position;
+ if (position + 1 > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ data[position] = (byte) b;
+ position++;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(): value: " + b + " remaining: " + remaining + " position: " + position);
+ }
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("write(bytes[], int, int) offset: " + offset + " length: " + length + " position: "
+ + position);
+ }
+ if (position + length > data.length - 1)
+ throw new IndexOutOfBoundsException();
+ System.arraycopy(bytes, offset, data, position, length);
+ position += length;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
new file mode 100644
index 0000000..32c21ac
--- /dev/null
+++ b/fullstack/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class SerDeUtils {
+
+ public static byte[] serialize(Writable object) throws IOException {
+ ByteArrayOutputStream bbos = new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(bbos);
+ object.write(output);
+ return bbos.toByteArray();
+ }
+
+ public static void deserialize(Writable object, byte[] buffer) throws IOException {
+ ByteArrayInputStream bbis = new ByteArrayInputStream(buffer);
+ DataInput input = new DataInputStream(bbis);
+ object.readFields(input);
+ }
+
+ public static long readVLong(DataInput in) throws IOException {
+ int vLen = 0;
+ long value = 0L;
+ while (true) {
+ byte b = (byte) in.readByte();
+ ++vLen;
+ value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ }
+ return value;
+ }
+
+ public static void writeVLong(DataOutput out, long value) throws IOException {
+ long data = value;
+ do {
+ byte b = (byte) (data & 0x7f);
+ data >>= 7;
+ if (data != 0) {
+ b |= 0x80;
+ }
+ out.write(b);
+ } while (data != 0);
+ }
+
+ public static long readVLong(byte[] data, int start, int length) {
+ int vLen = 0;
+ long value = 0L;
+ while (true) {
+ byte b = (byte) data[start];
+ ++vLen;
+ value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ ++start;
+ }
+ if (vLen != length)
+ throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
+ return value;
+ }
+
+}