add Pregelix codebase
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
new file mode 100644
index 0000000..0c90fc4
--- /dev/null
+++ b/pregelix/pregelix-api/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pregelix-api</artifactId>
+ <name>pregelix-api</name>
+
+ <parent>
+ <groupId>edu.uci.ics.pregelix</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2</version>
+ <configuration>
+ <outputDirectory>target</outputDirectory>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>prop.jarLocation</name>
+ <value>target/pregelix-example-${project.version}-jar-with-dependencies.jar</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx2047m -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>.</directory>
+ <includes>
+ <include>teststore*</include>
+ <include>edu*</include>
+ <include>actual*</include>
+ <include>build*</include>
+ <include>expect*</include>
+ <include>ClusterController*</include>
+ <include>edu.uci.*</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
new file mode 100644
index 0000000..5ebf9df
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.delegate;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+
+@SuppressWarnings("rawtypes")
+public class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private Vertex vertex = null;
+
+ /** message tuple builder */
+ private ArrayTupleBuilder message;
+ private IFrameWriter msgWriter;
+ private FrameTupleAppender appenderMsg;
+
+ /** alive tuple builder */
+ private ArrayTupleBuilder alive;
+ private IFrameWriter aliveWriter;
+ private FrameTupleAppender appenderAlive;
+
+ /** message list */
+ private MsgList dummyMessageList = new MsgList();
+ /** whether alive message should be pushed out */
+ private boolean pushAlive;
+
+ public VertexDelegate(Vertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public void finishCompute() throws IOException {
+ // package alive info
+ if (pushAlive && !vertex.isHalted()) {
+ alive.reset();
+ DataOutput outputAlive = alive.getDataOutput();
+ vertexId.write(outputAlive);
+ alive.addFieldEndOffset();
+ dummyMessageList.write(outputAlive);
+ alive.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderAlive, alive, aliveWriter);
+ }
+ }
+
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+
+ /**
+ * send out message along message channel
+ */
+ try {
+ message.reset();
+ DataOutput outputMsg = message.getDataOutput();
+ id.write(outputMsg);
+ message.addFieldEndOffset();
+ msg.write(outputMsg);
+ message.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderMsg, message, msgWriter);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public final void setVertex(Vertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ public final void setOutputWriters(List<IFrameWriter> outputs) {
+ msgWriter = outputs.get(0);
+ if (outputs.size() > 1) {
+ aliveWriter = outputs.get(1);
+ pushAlive = true;
+ }
+ }
+
+ public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ appenderMsg = appenders.get(0);
+ if (appenders.size() > 1) {
+ appenderAlive = appenders.get(1);
+ }
+ }
+
+ public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ message = tbs.get(0);
+ if (tbs.size() > 1) {
+ alive = tbs.get(1);
+ }
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
new file mode 100644
index 0000000..84bc8de
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Edge.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * A complete edge, the destination vertex and the edge value. Can only be one
+ * edge with a destination vertex id per edge map.
+ *
+ * @param <I>
+ * Vertex index
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class Edge<I extends WritableComparable, E extends Writable> implements Writable, Configurable, Comparable {
+ /** Destination vertex id */
+ private I destVertexId = null;
+ /** Edge value */
+ private E edgeValue = null;
+ /** Configuration - Used to instiantiate classes */
+ private Configuration conf = null;
+ private boolean hasEdgeValue = false;
+
+ /**
+ * Constructor for reflection
+ */
+ public Edge() {
+ }
+
+ /**
+ * Create the edge with final values
+ *
+ * @param destVertexId
+ * @param edgeValue
+ */
+ public Edge(I destVertexId, E edgeValue) {
+ this.destVertexId = destVertexId;
+ this.edgeValue = edgeValue;
+ if (edgeValue != null)
+ hasEdgeValue = true;
+ }
+
+ /**
+ * Get the destination vertex index of this edge
+ *
+ * @return Destination vertex index of this edge
+ */
+ public I getDestVertexId() {
+ return destVertexId;
+ }
+
+ /**
+ * set the destination vertex id
+ *
+ * @param destVertexId
+ */
+ public void setDestVertexId(I destVertexId) {
+ this.destVertexId = destVertexId;
+ }
+
+ /**
+ * Get the edge value of the edge
+ *
+ * @return Edge value of this edge
+ */
+ public E getEdgeValue() {
+ return edgeValue;
+ }
+
+ /**
+ * set the edge of value
+ *
+ * @param edgeValue
+ */
+ public void setEdgeValue(E edgeValue) {
+ this.edgeValue = edgeValue;
+ if (edgeValue != null)
+ hasEdgeValue = true;
+ }
+
+ @Override
+ public String toString() {
+ return "(DestVertexIndex = " + destVertexId + ", edgeValue = " + edgeValue + ")";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ if (destVertexId == null)
+ destVertexId = (I) BspUtils.createVertexIndex(getConf());
+ destVertexId.readFields(input);
+ hasEdgeValue = input.readBoolean();
+ if (hasEdgeValue) {
+ if (edgeValue == null)
+ edgeValue = (E) BspUtils.createEdgeValue(getConf());
+ edgeValue.readFields(input);
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (destVertexId == null) {
+ throw new IllegalStateException("write: Null destination vertex index");
+ }
+ destVertexId.write(output);
+ output.writeBoolean(hasEdgeValue);
+ if (hasEdgeValue)
+ edgeValue.write(output);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public boolean equals(Edge<I, E> edge) {
+ return this.destVertexId.equals(edge.getDestVertexId());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(Object o) {
+ Edge<I, E> edge = (Edge<I, E>) o;
+ return destVertexId.compareTo(edge.getDestVertexId());
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
new file mode 100644
index 0000000..57261f9
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MsgList.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that allows the message class to be
+ * set prior to calling readFields().
+ *
+ * @param <M>
+ * message type
+ */
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 100L;
+
+ /**
+ * Default constructor.
+ */
+ public MsgList() {
+ super();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
new file mode 100644
index 0000000..6a77349
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.pregelix.api.delegate.VertexDelegate;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
+/**
+ * User applications should all subclass {@link Vertex}. Package access should
+ * prevent users from accessing internal methods.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements Writable {
+ private static long superstep = 0;
+ /** Class-wide number of vertices */
+ private static long numVertices = -1;
+ /** Class-wide number of edges */
+ private static long numEdges = -1;
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** Map of destination vertices and their edge values */
+ private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+ /** map context */
+ private static Mapper.Context context = null;
+ /** a delegate for hyracks stuff */
+ private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+ /** this vertex is updated or not */
+ private boolean updated = false;
+ /** has outgoing messages */
+ private boolean hasMessage = false;
+
+ /**
+ * use object pool for re-using objects
+ */
+ private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+ private List<M> msgPool = new ArrayList<M>();
+ private List<V> valuePool = new ArrayList<V>();
+ private int usedEdge = 0;
+ private int usedMessage = 0;
+ private int usedValue = 0;
+
+ /**
+ * The key method that users need to implement
+ *
+ * @param msgIterator
+ */
+ public abstract void compute(Iterator<M> msgIterator);
+
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ Edge<I, E> edge = this.allocateEdge();
+ edge.setDestVertexId(targetVertexId);
+ edge.setEdgeValue(edgeValue);
+ destEdgeList.add(edge);
+ return true;
+ }
+
+ public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ destEdgeList.clear();
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+ }
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
+
+ public void reset() {
+ usedEdge = 0;
+ usedMessage = 0;
+ usedValue = 0;
+ }
+
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
+
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
+
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedEdge);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
+
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ delegate.setVertexId(vertexId);
+ }
+
+ public final I getVertexId() {
+ return vertexId;
+ }
+
+ /**
+ * Set the global superstep folr all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
+
+ public static long getCurrentSuperstep() {
+ return superstep;
+ }
+
+ public final long getSuperstep() {
+ return superstep;
+ }
+
+ public final V getVertexValue() {
+ return vertexValue;
+ }
+
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ this.updated = true;
+ }
+
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
+
+ public final long getNumVertices() {
+ return numVertices;
+ }
+
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
+
+ public final long getNumEdges() {
+ return numEdges;
+ }
+
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+ delegate.sendMsg(id, msg);
+ this.hasMessage = true;
+ }
+
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeList) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
+
+ public final void voteToHalt() {
+ halt = true;
+ }
+
+ public final boolean isHalted() {
+ return halt;
+ }
+
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ reset();
+ if (vertexId == null)
+ vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+ vertexId.readFields(in);
+ delegate.setVertexId(vertexId);
+ boolean hasVertexValue = in.readBoolean();
+ if (hasVertexValue) {
+ vertexValue = allocateValue();
+ vertexValue.readFields(in);
+ delegate.setVertex(this);
+ }
+ destEdgeList.clear();
+ long edgeMapSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = allocateEdge();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(in);
+ addEdge(edge);
+ }
+ msgList.clear();
+ long msgListSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = allocateMessage();
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ updated = false;
+ hasMessage = false;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ SerDeUtils.writeVLong(out, destEdgeList.size());
+ for (Edge<I, E> edge : destEdgeList) {
+ edge.write(out);
+ }
+ SerDeUtils.writeVLong(out, msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
+
+ private boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ destEdgeList.add(edge);
+ return true;
+ }
+
+ public List<M> getMsgList() {
+ return msgList;
+ }
+
+ public List<Edge<I, E>> getEdges() {
+ return this.destEdgeList;
+ }
+
+ public final Mapper<?, ?, ?, ?>.Context getContext() {
+ return context;
+ }
+
+ public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ Vertex.context = context;
+ }
+
+ @SuppressWarnings("unchecked")
+ public String toString() {
+ Collections.sort(destEdgeList);
+ StringBuffer edgeBuffer = new StringBuffer();
+ edgeBuffer.append("(");
+ for (Edge<I, E> edge : destEdgeList) {
+ edgeBuffer.append(edge.getDestVertexId()).append(",");
+ }
+ edgeBuffer.append(")");
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+ }
+
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
+
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
+
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
+
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
+
+ public boolean hasUpdate() {
+ return this.updated;
+ }
+
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
+
+ public int getNumOutEdges() {
+ return destEdgeList.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void sortEdges() {
+ Collections.sort((List) destEdgeList);
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
new file mode 100644
index 0000000..c54b4ea
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexCombiner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * interface to implement for combining of messages sent to the same vertex.
+ *
+ * @param <I extends Writable> index
+ * @param <M extends Writable> message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexCombiner<I extends WritableComparable, M extends Writable> {
+
+ /**
+ * initialize combiner
+ */
+ public void init();
+
+ /**
+ * step call
+ *
+ * @param vertexIndex
+ * @param msg
+ * @throws IOException
+ */
+ public void step(I vertexIndex, M msg) throws IOException;
+
+ /**
+ * finish aggregate
+ *
+ * @return Message
+ */
+ public M finish();
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
new file mode 100644
index 0000000..cfb2a5e
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This InputSplit will not give any ordering or location data. It is used
+ * internally by BspInputFormat (which determines how many tasks to run the
+ * application on). Users should not use this directly.
+ */
+public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
+ private static final long serialVersionUID = 1L;
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
+
+ public BasicGenInputSplit() {
+ }
+
+ public BasicGenInputSplit(int splitIndex, int numSplits) {
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[] {};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
+
+ public int getSplitIndex() {
+ return splitIndex;
+ }
+
+ public int getNumSplits() {
+ return numSplits;
+ }
+
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
new file mode 100644
index 0000000..4da19de
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexInputFormat.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Use this to load data for a BSP application. Note that the InputSplit must
+ * also implement Writable. The InputSplits will determine the partitioning of
+ * vertices across the mappers, so keep that in consideration when implementing
+ * getSplits().
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Logically split the vertices for a graph processing application.
+ *
+ * Each {@link InputSplit} is then assigned to a worker for processing.
+ *
+ * <p>
+ * <i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+ * input files are not physically split into chunks. For e.g. a split could
+ * be <i><input-file-path, start, offset></i> tuple. The InputFormat
+ * also creates the {@link VertexReader} to read the {@link InputSplit}.
+ *
+ * Also, the number of workers is a hint given to the developer to try to
+ * intelligently determine how many splits to create (if this is adjustable)
+ * at runtime.
+ *
+ * @param context
+ * Context of the job
+ * @param numWorkers
+ * Number of workers used for this job
+ * @return an array of {@link InputSplit}s for the job.
+ */
+ public abstract List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException,
+ InterruptedException;
+
+ /**
+ * Create a vertex reader for a given split. The framework will call
+ * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split
+ * the split to be read
+ * @param context
+ * the information about the task
+ * @return a new record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VertexReader<I, V, E, M> createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
new file mode 100644
index 0000000..4675afe
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexOutputFormat.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implement to output the graph after the computation. It is modeled directly
+ * after the Hadoop OutputFormat.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class VertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> {
+ /**
+ * Create a vertex writer for a given split. The framework will call
+ * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param context
+ * the information about the task
+ * @return a new vertex writer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException;
+
+ /**
+ * Check for validity of the output-specification for the job. (Copied from
+ * Hadoop OutputFormat)
+ *
+ * <p>
+ * This is to validate the output specification for the job when it is a job
+ * is submitted. Typically checks that it does not already exist, throwing
+ * an exception when it already exists, so that output is not overwritten.
+ * </p>
+ *
+ * @param context
+ * information about the job
+ * @throws IOException
+ * when output should not be attempted
+ */
+ public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
+
+ /**
+ * Get the output committer for this output format. This is responsible for
+ * ensuring the output is committed correctly. (Copied from Hadoop
+ * OutputFormat)
+ *
+ * @param context
+ * the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
new file mode 100644
index 0000000..2794231
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Analogous to {@link RecordReader} for vertices. Will read the vertices from
+ * an input split.
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex data
+ * @param <E>
+ * Edge data
+ * @param <M>
+ * Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Use the input split and context t o setup reading the vertices.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit
+ * Input split to be used for reading vertices.
+ * @param context
+ * Context from the task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException;
+
+ /**
+ *
+ * @return false iff there are no more vertices
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean nextVertex() throws IOException, InterruptedException;
+
+ /**
+ * Get the current vertex.
+ *
+ * @return the current vertex which has been read. nextVertex() should be
+ * called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ Vertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException;
+
+ /**
+ * Close this {@link VertexReader} to future operations.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link VertexReader} consumed i.e. has been
+ * processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ float getProgress() throws IOException, InterruptedException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
new file mode 100644
index 0000000..a7654a9
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/VertexWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+/**
+ * Implement to output a vertex range of the graph after the computation
+ *
+ * @param <I>
+ * Vertex id
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexWriter<I extends WritableComparable, V extends Writable, E extends Writable> {
+ /**
+ * Use the context to setup writing the vertices. Guaranteed to be called
+ * prior to any other function.
+ *
+ * @param context
+ * Context used to write the vertices.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void initialize(TaskAttemptContext context) throws IOException, InterruptedException;
+
+ /**
+ * Writes the next vertex and associated data
+ *
+ * @param vertex
+ * set the properties of this vertex
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException;
+
+ /**
+ * Close this {@link VertexWriter} to future operations.
+ *
+ * @param context
+ * the context of the task
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void close(TaskAttemptContext context) throws IOException, InterruptedException;
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
new file mode 100644
index 0000000..12924d2
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging. It simply generates
+ * some vertex data that can be consumed by test applications.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ // This is meaningless, the VertexReader will generate all the test
+ // data.
+ List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+ for (int i = 0; i < numWorkers; ++i) {
+ inputSplitList.add(new BasicGenInputSplit(i, numWorkers));
+ }
+ return inputSplitList;
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
new file mode 100644
index 0000000..fc4182b
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/generated/GeneratedVertexReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.generated;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.io.BasicGenInputSplit;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Used by GeneratedVertexInputFormat to read some generated data
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Records read so far */
+ protected long recordsRead = 0;
+ /** Total records to read (on this split alone) */
+ protected long totalRecords = 0;
+ /** The input split from initialize(). */
+ protected BasicGenInputSplit inputSplit = null;
+ /** Reverse the id order? */
+ protected boolean reverseIdOrder;
+
+ protected Configuration configuration = null;
+
+ public static final String READER_VERTICES = "GeneratedVertexReader.reader_vertices";
+ public static final long DEFAULT_READER_VERTICES = 10;
+ public static final String REVERSE_ID_ORDER = "GeneratedVertexReader.reverseIdOrder";
+ public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+
+ public GeneratedVertexReader() {
+ }
+
+ @Override
+ final public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
+ configuration = context.getConfiguration();
+ totalRecords = configuration.getLong(GeneratedVertexReader.READER_VERTICES,
+ GeneratedVertexReader.DEFAULT_READER_VERTICES);
+ reverseIdOrder = configuration.getBoolean(GeneratedVertexReader.REVERSE_ID_ORDER,
+ GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+ this.inputSplit = (BasicGenInputSplit) inputSplit;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ final public float getProgress() throws IOException {
+ return recordsRead * 100.0f / totalRecords;
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
new file mode 100644
index 0000000..4c804db
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexInputFormat.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexReader;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ * @param <M>
+ * Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexInputFormat<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+ /** Uses the TextInputFormat to do everything */
+ protected TextInputFormat textInputFormat = new TextInputFormat();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex input. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class TextVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /** Internal line record reader */
+ private final RecordReader<LongWritable, Text> lineRecordReader;
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+
+ /**
+ * Initialize with the LineRecordReader.
+ *
+ * @param lineRecordReader
+ * Line record reader from TextInputFormat
+ */
+ public TextVertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ this.lineRecordReader = lineRecordReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordReader.initialize(inputSplit, context);
+ this.context = context;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lineRecordReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return lineRecordReader.getProgress();
+ }
+
+ /**
+ * Get the line record reader.
+ *
+ * @return Record reader to be used for reading.
+ */
+ protected RecordReader<LongWritable, Text> getRecordReader() {
+ return lineRecordReader;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ protected TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers) throws IOException, InterruptedException {
+ // Ignore the hint of numWorkers here since we are using TextInputFormat
+ // to do this for us
+ return textInputFormat.getSplits(context);
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
new file mode 100644
index 0000000..c026945
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/text/TextVertexOutputFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.io.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+
+/**
+ * Abstract class that users should subclass to use their own text based vertex
+ * output format.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+ /** Uses the TextOutputFormat to do everything */
+ protected TextOutputFormat<Text, Text> textOutputFormat = new TextOutputFormat<Text, Text>();
+
+ /**
+ * Abstract class to be implemented by the user based on their specific
+ * vertex output. Easiest to ignore the key value separator and only use key
+ * instead.
+ *
+ * @param <I>
+ * Vertex index value
+ * @param <V>
+ * Vertex value
+ * @param <E>
+ * Edge value
+ */
+ public static abstract class TextVertexWriter<I extends WritableComparable, V extends Writable, E extends Writable>
+ implements VertexWriter<I, V, E> {
+ /** Context passed to initialize */
+ private TaskAttemptContext context;
+ /** Internal line record writer */
+ private final RecordWriter<Text, Text> lineRecordWriter;
+
+ /**
+ * Initialize with the LineRecordWriter.
+ *
+ * @param lineRecordWriter
+ * Line record writer from TextOutputFormat
+ */
+ public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+ this.lineRecordWriter = lineRecordWriter;
+ }
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException {
+ this.context = context;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ lineRecordWriter.close(context);
+ }
+
+ /**
+ * Get the line record writer.
+ *
+ * @return Record writer to be used for writing.
+ */
+ public RecordWriter<Text, Text> getRecordWriter() {
+ return lineRecordWriter;
+ }
+
+ /**
+ * Get the context.
+ *
+ * @return Context passed to initialize.
+ */
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ textOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return textOutputFormat.getOutputCommitter(context);
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
new file mode 100644
index 0000000..f9a05cb
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexCombiner;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+
+/**
+ * Limits the functions that can be called by the user. Job is too flexible for
+ * our needs. For instance, our job should not have any reduce tasks.
+ */
+public class PregelixJob extends Job {
+ /** Vertex class - required */
+ public static final String VERTEX_CLASS = "giraph.vertexClass";
+ /** VertexInputFormat class - required */
+ public static final String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+ /** VertexOutputFormat class - optional */
+ public static final String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+ /** Vertex combiner class - optional */
+ public static final String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+ /** Vertex resolver class - optional */
+ public static final String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+ /** Vertex index class */
+ public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
+ /** Vertex value class */
+ public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+ /** Edge value class */
+ public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+ /** Message value class */
+ public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+ /** Aggregator class */
+ public static final String AGGREGATOR_NAME = "giraph.aggregatorClass";
+ /** num of vertices */
+ public static final String NUM_VERTICE = "giraph.numVertices";
+ /** num of edges */
+ public static final String NUM_EDGES = "giraph.numEdges";
+
+ /**
+ * Constructor that will instantiate the configuration
+ *
+ * @param jobName
+ * User-defined job name
+ * @throws IOException
+ */
+ public PregelixJob(String jobName) throws IOException {
+ super(new Configuration(), jobName);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param conf
+ * User-defined configuration
+ * @param jobName
+ * User-defined job name
+ * @throws IOException
+ */
+ public PregelixJob(Configuration conf, String jobName) throws IOException {
+ super(conf, jobName);
+ }
+
+ /**
+ * Set the vertex class (required)
+ *
+ * @param vertexClass
+ * Runs vertex computation
+ */
+ final public void setVertexClass(Class<?> vertexClass) {
+ getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+ }
+
+ /**
+ * Set the vertex input format class (required)
+ *
+ * @param vertexInputFormatClass
+ * Determines how graph is input
+ */
+ final public void setVertexInputFormatClass(Class<?> vertexInputFormatClass) {
+ getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS, vertexInputFormatClass, VertexInputFormat.class);
+ }
+
+ /**
+ * Set the vertex output format class (optional)
+ *
+ * @param vertexOutputFormatClass
+ * Determines how graph is output
+ */
+ final public void setVertexOutputFormatClass(Class<?> vertexOutputFormatClass) {
+ getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS, vertexOutputFormatClass, VertexOutputFormat.class);
+ }
+
+ /**
+ * Set the vertex combiner class (optional)
+ *
+ * @param vertexCombinerClass
+ * Determines how vertex messages are combined
+ */
+ final public void setVertexCombinerClass(Class<?> vertexCombinerClass) {
+ getConfiguration().setClass(VERTEX_COMBINER_CLASS, vertexCombinerClass, VertexCombiner.class);
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
new file mode 100644
index 0000000..47a6c4c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ArrayListWritable.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to Yahoo! under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Yahoo! licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A Writable for ListArray containing instances of a class.
+ */
+public abstract class ArrayListWritable<M extends Writable> extends ArrayList<M> implements Writable, Configurable {
+ /** Used for instantiation */
+ private Class<M> refClass = null;
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 1L;
+ /** Configuration */
+ private Configuration conf;
+ /** content object pool */
+ private List<M> pool = new ArrayList<M>();
+ /** how many instance in the pool has been used */
+ private int used = 0;
+
+ /**
+ * Using the default constructor requires that the user implement
+ * setClass(), guaranteed to be invoked prior to instantiation in
+ * readFields()
+ */
+ public ArrayListWritable() {
+ }
+
+ /**
+ * This constructor allows setting the refClass during construction.
+ *
+ * @param refClass
+ * internal type class
+ */
+ public ArrayListWritable(Class<M> refClass) {
+ super();
+ this.refClass = refClass;
+ }
+
+ /**
+ * This is a one-time operation to set the class type
+ *
+ * @param refClass
+ * internal type class
+ */
+ public void setClass(Class<M> refClass) {
+ if (this.refClass != null) {
+ throw new RuntimeException("setClass: refClass is already set to " + this.refClass.getName());
+ }
+ this.refClass = refClass;
+ }
+
+ /**
+ * Subclasses must set the class type appropriately and can use
+ * setClass(Class<M> refClass) to do it.
+ */
+ public abstract void setClass();
+
+ public void readFields(DataInput in) throws IOException {
+ if (this.refClass == null) {
+ setClass();
+ }
+ used = 0;
+ this.clear();
+ int numValues = in.readInt(); // read number of values
+ ensureCapacity(numValues);
+ for (int i = 0; i < numValues; i++) {
+ M value = allocateValue();
+ value.readFields(in); // read a value
+ add(value); // store it in values
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ int numValues = size();
+ out.writeInt(numValues); // write number of values
+ for (int i = 0; i < numValues; i++) {
+ get(i).write(out);
+ }
+ }
+
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ private M allocateValue() {
+ if (used >= pool.size()) {
+ M value = ReflectionUtils.newInstance(refClass, conf);
+ pool.add(value);
+ used++;
+ return value;
+ } else {
+ M value = pool.get(used);
+ used++;
+ return value;
+ }
+ }
+
+ public void reset(ArrayIterator<M> iterator) {
+ iterator.reset(this);
+ }
+
+ public static class ArrayIterator<M> implements Iterator<M> {
+
+ private int pos = 0;
+ private List<M> list;
+
+ private void reset(List<M> list) {
+ this.list = list;
+ pos = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return pos < list.size();
+ }
+
+ @Override
+ public M next() {
+ M item = list.get(pos);
+ pos++;
+ return item;
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalStateException("should not be called");
+ }
+
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
new file mode 100644
index 0000000..9c31646
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.graph.VertexCombiner;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Help to use the configuration to get the appropriate classes or instantiate
+ * them.
+ */
+public class BspUtils {
+ private static Configuration defaultConf = null;
+
+ public static void setDefaultConfiguration(Configuration conf) {
+ defaultConf = conf;
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexInputFormat}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex input format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends VertexInputFormat<I, V, E, M>> getVertexInputFormatClass(
+ Configuration conf) {
+ return (Class<? extends VertexInputFormat<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_INPUT_FORMAT_CLASS,
+ null, VertexInputFormat.class);
+ }
+
+ /**
+ * Create a user vertex input format class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex input format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> VertexInputFormat<I, V, E, M> createVertexInputFormat(
+ Configuration conf) {
+ Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = getVertexInputFormatClass(conf);
+ VertexInputFormat<I, V, E, M> inputFormat = ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+ return inputFormat;
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexOutputFormat}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex output format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable> Class<? extends VertexOutputFormat<I, V, E>> getVertexOutputFormatClass(
+ Configuration conf) {
+ return (Class<? extends VertexOutputFormat<I, V, E>>) conf.getClass(PregelixJob.VERTEX_OUTPUT_FORMAT_CLASS,
+ null, VertexOutputFormat.class);
+ }
+
+ /**
+ * Create a user vertex output format class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex output format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable> VertexOutputFormat<I, V, E> createVertexOutputFormat(
+ Configuration conf) {
+ Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass = getVertexOutputFormatClass(conf);
+ return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed {@link VertexCombiner}.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex combiner class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, M extends Writable> Class<? extends VertexCombiner<I, M>> getVertexCombinerClass(
+ Configuration conf) {
+ return (Class<? extends VertexCombiner<I, M>>) conf.getClass(PregelixJob.VERTEX_COMBINER_CLASS, null,
+ VertexCombiner.class);
+ }
+
+ /**
+ * Create a user vertex combiner class
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex combiner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, M extends Writable> VertexCombiner<I, M> createVertexCombiner(
+ Configuration conf) {
+ Class<? extends VertexCombiner<I, M>> vertexCombinerClass = getVertexCombinerClass(conf);
+ return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+ }
+
+ /**
+ * Get the user's subclassed Vertex.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Class<? extends Vertex<I, V, E, M>> getVertexClass(
+ Configuration conf) {
+ return (Class<? extends Vertex<I, V, E, M>>) conf.getClass(PregelixJob.VERTEX_CLASS, null, Vertex.class);
+ }
+
+ /**
+ * Create a user vertex
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> Vertex<I, V, E, M> createVertex(
+ Configuration conf) {
+ Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+ Vertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
+ return vertex;
+ }
+
+ /**
+ * Get the user's subclassed vertex index class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex index class
+ */
+ @SuppressWarnings("unchecked")
+ public static <I extends Writable> Class<I> getVertexIndexClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<I>) conf.getClass(PregelixJob.VERTEX_INDEX_CLASS, WritableComparable.class);
+ }
+
+ /**
+ * Create a user vertex index
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex index
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable> I createVertexIndex(Configuration conf) {
+ Class<I> vertexClass = getVertexIndexClass(conf);
+ try {
+ return vertexClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexIndex: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexIndex: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends Writable> Class<V> getVertexValueClass(Configuration conf) {
+ return (Class<V>) conf.getClass(PregelixJob.VERTEX_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Create a user vertex value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex value
+ */
+ public static <V extends Writable> V createVertexValue(Configuration conf) {
+ Class<V> vertexValueClass = getVertexValueClass(conf);
+ try {
+ return vertexValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createVertexValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createVertexValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed edge value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex edge value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <E extends Writable> Class<E> getEdgeValueClass(Configuration conf) {
+ return (Class<E>) conf.getClass(PregelixJob.EDGE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Create a user edge value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user edge value
+ */
+ public static <E extends Writable> E createEdgeValue(Configuration conf) {
+ Class<E> edgeValueClass = getEdgeValueClass(conf);
+ try {
+ return edgeValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createEdgeValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createEdgeValue: Illegally accessed", e);
+ }
+ }
+
+ /**
+ * Get the user's subclassed vertex message value class.
+ *
+ * @param conf
+ * Configuration to check
+ * @return User's vertex message value class
+ */
+ @SuppressWarnings("unchecked")
+ public static <M extends Writable> Class<M> getMessageValueClass(Configuration conf) {
+ if (conf == null)
+ conf = defaultConf;
+ return (Class<M>) conf.getClass(PregelixJob.MESSAGE_VALUE_CLASS, Writable.class);
+ }
+
+ /**
+ * Create a user vertex message value
+ *
+ * @param conf
+ * Configuration to check
+ * @return Instantiated user vertex message value
+ */
+ public static <M extends Writable> M createMessageValue(Configuration conf) {
+ Class<M> messageValueClass = getMessageValueClass(conf);
+ try {
+ return messageValueClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("createMessageValue: Failed to instantiate", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
+ }
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
new file mode 100644
index 0000000..3742afa
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class FrameTupleUtils {
+
+ public static void flushTuple(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
+ throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ public static void flushTuplesFinal(FrameTupleAppender appender, IFrameWriter writer) throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ }
+ }
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
new file mode 100644
index 0000000..d599f34
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ReflectionUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.api.util;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods to get type arguments to generic classes. Courtesy of Ian
+ * Robertson (overstock.com). Make sure to use with abstract generic classes,
+ * not interfaces.
+ */
+public class ReflectionUtils {
+ /**
+ * Do not instantiate.
+ */
+ private ReflectionUtils() {
+ }
+
+ /**
+ * Get the underlying class for a type, or null if the type is a variable
+ * type.
+ *
+ * @param type
+ * the type
+ * @return the underlying class
+ */
+ public static Class<?> getClass(Type type) {
+ if (type instanceof Class) {
+ return (Class<?>) type;
+ } else if (type instanceof ParameterizedType) {
+ return getClass(((ParameterizedType) type).getRawType());
+ } else if (type instanceof GenericArrayType) {
+ Type componentType = ((GenericArrayType) type).getGenericComponentType();
+ Class<?> componentClass = getClass(componentType);
+ if (componentClass != null) {
+ return Array.newInstance(componentClass, 0).getClass();
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the actual type arguments a child class has used to extend a generic
+ * base class.
+ *
+ * @param <T>
+ * Type to evaluate.
+ * @param baseClass
+ * the base class
+ * @param childClass
+ * the child class
+ * @return a list of the raw classes for the actual type arguments.
+ */
+ public static <T> List<Class<?>> getTypeArguments(Class<T> baseClass, Class<? extends T> childClass) {
+ Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
+ Type type = childClass;
+ // start walking up the inheritance hierarchy until we hit baseClass
+ while (!getClass(type).equals(baseClass)) {
+ if (type instanceof Class) {
+ // there is no useful information for us in raw types,
+ // so just keep going.
+ type = ((Class<?>) type).getGenericSuperclass();
+ } else {
+ ParameterizedType parameterizedType = (ParameterizedType) type;
+ Class<?> rawType = (Class<?>) parameterizedType.getRawType();
+
+ Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+ TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
+ for (int i = 0; i < actualTypeArguments.length; i++) {
+ resolvedTypes.put(typeParameters[i], actualTypeArguments[i]);
+ }
+
+ if (!rawType.equals(baseClass)) {
+ type = rawType.getGenericSuperclass();
+ }
+ }
+ }
+
+ // finally, for each actual type argument provided to baseClass,
+ // determine (if possible)
+ // the raw class for that type argument.
+ Type[] actualTypeArguments;
+ if (type instanceof Class) {
+ actualTypeArguments = ((Class<?>) type).getTypeParameters();
+ } else {
+ actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
+ }
+ List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
+ // resolve types by chasing down type variables.
+ for (Type baseType : actualTypeArguments) {
+ while (resolvedTypes.containsKey(baseType)) {
+ baseType = resolvedTypes.get(baseType);
+ }
+ typeArgumentsAsClasses.add(getClass(baseType));
+ }
+ return typeArgumentsAsClasses;
+ }
+
+ /**
+ * Try to directly set a (possibly private) field on an Object.
+ *
+ * @param target
+ * Target to set the field on.
+ * @param fieldname
+ * Name of field.
+ * @param value
+ * Value to set on target.
+ */
+ public static void setField(Object target, String fieldname, Object value) throws NoSuchFieldException,
+ IllegalAccessException {
+ Field field = findDeclaredField(target.getClass(), fieldname);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ /**
+ * Find a declared field in a class or one of its super classes
+ *
+ * @param inClass
+ * Class to search for declared field.
+ * @param fieldname
+ * Field name to search for
+ * @return Field or will throw.
+ * @throws NoSuchFieldException
+ * When field not found.
+ */
+ private static Field findDeclaredField(Class<?> inClass, String fieldname) throws NoSuchFieldException {
+ while (!Object.class.equals(inClass)) {
+ for (Field field : inClass.getDeclaredFields()) {
+ if (field.getName().equalsIgnoreCase(fieldname)) {
+ return field;
+ }
+ }
+ inClass = inClass.getSuperclass();
+ }
+ throw new NoSuchFieldException();
+ }
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
new file mode 100644
index 0000000..3e917ec
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/SerDeUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class SerDeUtils {
+
+ public static byte[] serialize(Writable object) throws IOException {
+ ByteArrayOutputStream bbos = new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(bbos);
+ object.write(output);
+ return bbos.toByteArray();
+ }
+
+ public static void deserialize(Writable object, byte[] buffer) throws IOException {
+ ByteArrayInputStream bbis = new ByteArrayInputStream(buffer);
+ DataInput input = new DataInputStream(bbis);
+ object.readFields(input);
+ }
+
+ public static long readVLong(DataInput in) throws IOException {
+ int vLen = 0;
+ long value = 0L;
+ while (true) {
+ byte b = (byte) in.readByte();
+ ++vLen;
+ value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ }
+ return value;
+ }
+
+ public static void writeVLong(DataOutput out, long value) throws IOException {
+ long data = value;
+ do {
+ byte b = (byte) (data & 0x7f);
+ data >>= 7;
+ if (data != 0) {
+ b |= 0x80;
+ }
+ out.write(b);
+ } while (data != 0);
+ }
+
+ public static long readVLong(byte[] data, int start, int length) {
+ int vLen = 0;
+ long value = 0L;
+ while (true) {
+ byte b = (byte) data[start];
+ ++vLen;
+ value += (((long) (b & 0x7f)) << ((vLen - 1) * 7));
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ ++start;
+ }
+ if (vLen != length)
+ throw new IllegalStateException("length mismatch -- vLen:" + vLen + " length:" + length);
+ return value;
+ }
+
+}