minor refactoring and more comments
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2088 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 329e48a..cb27249 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -7,10 +7,27 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+/**
+ * This is the abstract class to implement for aggregating the state of all the vertices globally in the graph.
+ * </p>
+ * The global aggregation of vertices in a distributed cluster include two phase:
+ * 1. a local phase which aggregates vertice sent from a single machine and produces
+ * the partially aggregated state;
+ * 2. a final phase which aggregates all partially aggregated states
+ *
+ * @param <I extends Writable> vertex identifier type
+ * @param <E extends Writable> vertex value type
+ * @param <E extends Writable> edge type
+ * @param <M extends Writable> message type
+ * @param <P extends Writable>
+ * the type of the partial aggregate state
+ * @param <F extends Writable> the type of the final aggregate value
+ */
+
@SuppressWarnings("rawtypes")
public abstract class GlobalAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable, P extends Writable, F extends Writable> {
/**
- * initialize combiner
+ * initialize aggregator
*/
public abstract void init();
@@ -26,18 +43,18 @@
/**
* step through all intermediate aggregate result
*
- * @param partialResult partial aggregate value
+ * @param partialResult
+ * partial aggregate value
*/
public abstract void step(P partialResult);
-
/**
* finish partial aggregate
*
* @return the final aggregate value
*/
public abstract P finishPartial();
-
+
/**
* finish final aggregate
*
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
index fd540fd..e4f8ef9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/MessageCombiner.java
@@ -15,24 +15,35 @@
package edu.uci.ics.pregelix.api.graph;
-import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
- * interface to implement for combining of messages sent to the same vertex.
+ * This is the abstract class to implement for combining of messages that are sent to the same vertex.
+ * </p>
+ * This is similar to the concept of Combiner in Hadoop. The combining of messages in a distributed
+ * cluster include two phase:
+ * 1. a local phase which combines messages sent from a single machine and produces
+ * the partially combined message;
+ * 2. a final phase which combines messages at each receiver machine after the repartitioning (shuffling)
+ * and produces the final combined message
*
- * @param <I extends Writable> index
- * @param <M extends Writable> message data
+ * @param <I extends Writable> vertex identifier
+ * @param <M extends Writable> message body type
+ * @param <P extends Writable>
+ * the type of the partially combined messages
*/
@SuppressWarnings("rawtypes")
public abstract class MessageCombiner<I extends WritableComparable, M extends Writable, P extends Writable> {
/**
* initialize combiner
+ *
+ * @param providedMsgList
+ * the provided msg list for user implementation to update, which *should* be returned
+ * by the finishFinal() method
*/
public abstract void init(MsgList providedMsgList);
@@ -40,29 +51,35 @@
* step call for local combiner
*
* @param vertexIndex
+ * the receiver vertex identifier
* @param msg
- * @throws IOException
+ * a single message body
+ * @throws HyracksDataException
*/
- public abstract void step(I vertexIndex, M msg) throws HyracksDataException;
+ public abstract void stepPartial(I vertexIndex, M msg) throws HyracksDataException;
/**
* step call for global combiner
*
* @param vertexIndex
- * @param msg
- * @throws IOException
+ * the receiver vertex identifier
+ * @param partialAggregate
+ * the partial aggregate value
+ * @throws HyracksDataException
*/
- public abstract void step(P partialAggregate) throws HyracksDataException;
+ public abstract void stepFinal(I vertexIndex, P partialAggregate) throws HyracksDataException;
/**
* finish partial combiner
+ *
+ * @return the intermediate combined message of type P
*/
public abstract P finishPartial();
/**
* finish final combiner
*
- * @return Message
+ * @return the final message List
*/
public abstract MsgList<M> finishFinal();
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b0cd533..6856e9a 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -31,16 +31,15 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.pregelix.api.delegate.VertexDelegate;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.SerDeUtils;
/**
* User applications should all inherit {@link Vertex}, and implement their own
- * compute method.
+ * *compute* method.
*
* @param <I>
- * Vertex id type
+ * Vertex identifier type
* @param <V>
* Vertex value type
* @param <E>
@@ -50,393 +49,384 @@
*/
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
- private static long superstep = 0;
- /** Class-wide number of vertices */
- private static long numVertices = -1;
- /** Class-wide number of edges */
- private static long numEdges = -1;
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** Map of destination vertices and their edge values */
- private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
- /** If true, do not do anymore computation on this vertex. */
- boolean halt = false;
- /** List of incoming messages from the previous superstep */
- private final List<M> msgList = new ArrayList<M>();
- /** map context */
- private static Mapper.Context context = null;
- /** a delegate for hyracks stuff */
- private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
- this);
- /** this vertex is updated or not */
- private boolean updated = false;
- /** has outgoing messages */
- private boolean hasMessage = false;
+ implements Writable {
+ private static long superstep = 0;
+ /** Class-wide number of vertices */
+ private static long numVertices = -1;
+ /** Class-wide number of edges */
+ private static long numEdges = -1;
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** Map of destination vertices and their edge values */
+ private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+ /** map context */
+ private static Mapper.Context context = null;
+ /** a delegate for hyracks stuff */
+ private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+ /** this vertex is updated or not */
+ private boolean updated = false;
+ /** has outgoing messages */
+ private boolean hasMessage = false;
- /**
- * use object pool for re-using objects
- */
- private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
- private List<M> msgPool = new ArrayList<M>();
- private List<V> valuePool = new ArrayList<V>();
- private int usedEdge = 0;
- private int usedMessage = 0;
- private int usedValue = 0;
+ /**
+ * use object pool for re-using objects
+ */
+ private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+ private List<M> msgPool = new ArrayList<M>();
+ private List<V> valuePool = new ArrayList<V>();
+ private int usedEdge = 0;
+ private int usedMessage = 0;
+ private int usedValue = 0;
- /**
- * The key method that users need to implement
- *
- * @param msgIterator
- * an iterator of incoming messages
- */
- public abstract void compute(Iterator<M> msgIterator);
+ /**
+ * The key method that users need to implement
+ *
+ * @param msgIterator
+ * an iterator of incoming messages
+ */
+ public abstract void compute(Iterator<M> msgIterator);
- /**
- * Add an edge for the vertex.
- *
- * @param targetVertexId
- * @param edgeValue
- * @return successful or not
- */
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- Edge<I, E> edge = this.allocateEdge();
- edge.setDestVertexId(targetVertexId);
- edge.setEdgeValue(edgeValue);
- destEdgeList.add(edge);
- return true;
- }
+ /**
+ * Add an edge for the vertex.
+ *
+ * @param targetVertexId
+ * @param edgeValue
+ * @return successful or not
+ */
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ Edge<I, E> edge = this.allocateEdge();
+ edge.setDestVertexId(targetVertexId);
+ edge.setEdgeValue(edgeValue);
+ destEdgeList.add(edge);
+ return true;
+ }
- /**
- * Initialize a new vertex
- *
- * @param vertexId
- * @param vertexValue
- * @param edges
- * @param messages
- */
- public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
- List<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- destEdgeList.clear();
- if (edges != null && !edges.isEmpty()) {
- for (Map.Entry<I, E> entry : edges.entrySet()) {
- destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
- .getValue()));
- }
- }
- if (messages != null && !messages.isEmpty()) {
- msgList.addAll(messages);
- }
- }
+ /**
+ * Initialize a new vertex
+ *
+ * @param vertexId
+ * @param vertexValue
+ * @param edges
+ * @param messages
+ */
+ public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ destEdgeList.clear();
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+ }
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
- /**
- * reset a vertex object: clear its internal states
- */
- public void reset() {
- usedEdge = 0;
- usedMessage = 0;
- usedValue = 0;
- }
+ /**
+ * reset a vertex object: clear its internal states
+ */
+ public void reset() {
+ usedEdge = 0;
+ usedMessage = 0;
+ usedValue = 0;
+ }
- private Edge<I, E> allocateEdge() {
- Edge<I, E> edge;
- if (usedEdge < edgePool.size()) {
- edge = edgePool.get(usedEdge);
- usedEdge++;
- } else {
- edge = new Edge<I, E>();
- edgePool.add(edge);
- usedEdge++;
- }
- return edge;
- }
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
- private M allocateMessage() {
- M message;
- if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
- usedMessage++;
- } else {
- message = BspUtils.<M> createMessageValue(getContext()
- .getConfiguration());
- msgPool.add(message);
- usedMessage++;
- }
- return message;
- }
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
- private V allocateValue() {
- V value;
- if (usedValue < valuePool.size()) {
- value = valuePool.get(usedEdge);
- usedValue++;
- } else {
- value = BspUtils.<V> createVertexValue(getContext()
- .getConfiguration());
- valuePool.add(value);
- usedValue++;
- }
- return value;
- }
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedEdge);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
- /**
- * Set the vertex id
- *
- * @param vertexId
- */
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- delegate.setVertexId(vertexId);
- }
+ /**
+ * Set the vertex id
+ *
+ * @param vertexId
+ */
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ delegate.setVertexId(vertexId);
+ }
- /**
- * Get the vertex id
- *
- * @return vertex id
- */
- public final I getVertexId() {
- return vertexId;
- }
+ /**
+ * Get the vertex id
+ *
+ * @return vertex id
+ */
+ public final I getVertexId() {
+ return vertexId;
+ }
- /**
- * Set the global superstep for all the vertices (internal use)
- *
- * @param superstep
- * New superstep
- */
- public static void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
- public static long getCurrentSuperstep() {
- return superstep;
- }
+ public static long getCurrentSuperstep() {
+ return superstep;
+ }
- public final long getSuperstep() {
- return superstep;
- }
+ public final long getSuperstep() {
+ return superstep;
+ }
- public final V getVertexValue() {
- return vertexValue;
- }
+ public final V getVertexValue() {
+ return vertexValue;
+ }
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- this.updated = true;
- }
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ this.updated = true;
+ }
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices
- * Aggregate vertices in the last superstep
- */
- public static void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
- public final long getNumVertices() {
- return numVertices;
- }
+ public final long getNumVertices() {
+ return numVertices;
+ }
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges
- * Aggregate edges in the last superstep
- */
- public static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
- public final long getNumEdges() {
- return numEdges;
- }
+ public final long getNumEdges() {
+ return numEdges;
+ }
- /***
- * Send a message to a specific vertex
- *
- * @param id
- * the receiver vertex id
- * @param msg
- * the message
- */
- public final void sendMsg(I id, M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsg: Cannot send null message to " + id);
- }
- delegate.sendMsg(id, msg);
- this.hasMessage = true;
- }
+ /***
+ * Send a message to a specific vertex
+ *
+ * @param id
+ * the receiver vertex id
+ * @param msg
+ * the message
+ */
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+ delegate.sendMsg(id, msg);
+ this.hasMessage = true;
+ }
- /**
- * Send a message to all direct outgoing neighbors
- *
- * @param msg
- * the message
- */
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (Edge<I, E> edge : destEdgeList) {
- sendMsg(edge.getDestVertexId(), msg);
- }
- }
+ /**
+ * Send a message to all direct outgoing neighbors
+ *
+ * @param msg
+ * the message
+ */
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeList) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
- /**
- * Vote to halt. Once all vertex vote to halt and no more messages, a
- * Pregelix job will terminate.
- */
- public final void voteToHalt() {
- halt = true;
- }
+ /**
+ * Vote to halt. Once all vertex vote to halt and no more messages, a
+ * Pregelix job will terminate.
+ */
+ public final void voteToHalt() {
+ halt = true;
+ }
- /**
- * @return the vertex is halted (true) or not (false)
- */
- public final boolean isHalted() {
- return halt;
- }
+ /**
+ * @return the vertex is halted (true) or not (false)
+ */
+ public final boolean isHalted() {
+ return halt;
+ }
- @Override
- final public void readFields(DataInput in) throws IOException {
- reset();
- if (vertexId == null)
- vertexId = BspUtils.<I> createVertexIndex(getContext()
- .getConfiguration());
- vertexId.readFields(in);
- delegate.setVertexId(vertexId);
- boolean hasVertexValue = in.readBoolean();
- if (hasVertexValue) {
- vertexValue = allocateValue();
- vertexValue.readFields(in);
- delegate.setVertex(this);
- }
- destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < edgeMapSize; ++i) {
- Edge<I, E> edge = allocateEdge();
- edge.setConf(getContext().getConfiguration());
- edge.readFields(in);
- addEdge(edge);
- }
- msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < msgListSize; ++i) {
- M msg = allocateMessage();
- msg.readFields(in);
- msgList.add(msg);
- }
- halt = in.readBoolean();
- updated = false;
- hasMessage = false;
- }
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ reset();
+ if (vertexId == null)
+ vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+ vertexId.readFields(in);
+ delegate.setVertexId(vertexId);
+ boolean hasVertexValue = in.readBoolean();
+ if (hasVertexValue) {
+ vertexValue = allocateValue();
+ vertexValue.readFields(in);
+ delegate.setVertex(this);
+ }
+ destEdgeList.clear();
+ long edgeMapSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = allocateEdge();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(in);
+ addEdge(edge);
+ }
+ msgList.clear();
+ long msgListSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = allocateMessage();
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ updated = false;
+ hasMessage = false;
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- SerDeUtils.writeVLong(out, destEdgeList.size());
- for (Edge<I, E> edge : destEdgeList) {
- edge.write(out);
- }
- SerDeUtils.writeVLong(out, msgList.size());
- for (M msg : msgList) {
- msg.write(out);
- }
- out.writeBoolean(halt);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ SerDeUtils.writeVLong(out, destEdgeList.size());
+ for (Edge<I, E> edge : destEdgeList) {
+ edge.write(out);
+ }
+ SerDeUtils.writeVLong(out, msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
- private boolean addEdge(Edge<I, E> edge) {
- edge.setConf(getContext().getConfiguration());
- destEdgeList.add(edge);
- return true;
- }
+ private boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ destEdgeList.add(edge);
+ return true;
+ }
- /**
- * Get the list of incoming messages
- *
- * @return the list of messages
- */
- public List<M> getMsgList() {
- return msgList;
- }
+ /**
+ * Get the list of incoming messages
+ *
+ * @return the list of messages
+ */
+ public List<M> getMsgList() {
+ return msgList;
+ }
- /**
- * Get outgoing edge list
- *
- * @return a list of outgoing edges
- */
- public List<Edge<I, E>> getEdges() {
- return this.destEdgeList;
- }
+ /**
+ * Get outgoing edge list
+ *
+ * @return a list of outgoing edges
+ */
+ public List<Edge<I, E>> getEdges() {
+ return this.destEdgeList;
+ }
- public final Mapper<?, ?, ?, ?>.Context getContext() {
- return context;
- }
+ public final Mapper<?, ?, ?, ?>.Context getContext() {
+ return context;
+ }
- public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
- Vertex.context = context;
- }
+ public final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ Vertex.context = context;
+ }
- @SuppressWarnings("unchecked")
- public String toString() {
- Collections.sort(destEdgeList);
- StringBuffer edgeBuffer = new StringBuffer();
- edgeBuffer.append("(");
- for (Edge<I, E> edge : destEdgeList) {
- edgeBuffer.append(edge.getDestVertexId()).append(",");
- }
- edgeBuffer.append(")");
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
- + ", edges=" + edgeBuffer + ")";
- }
+ @SuppressWarnings("unchecked")
+ public String toString() {
+ Collections.sort(destEdgeList);
+ StringBuffer edgeBuffer = new StringBuffer();
+ edgeBuffer.append("(");
+ for (Edge<I, E> edge : destEdgeList) {
+ edgeBuffer.append(edge.getDestVertexId()).append(",");
+ }
+ edgeBuffer.append(")");
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+ }
- public void setOutputWriters(List<IFrameWriter> writers) {
- delegate.setOutputWriters(writers);
- }
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
- public void setOutputAppenders(List<FrameTupleAppender> appenders) {
- delegate.setOutputAppenders(appenders);
- }
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
- public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
- delegate.setOutputTupleBuilders(tbs);
- }
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
- public void finishCompute() throws IOException {
- delegate.finishCompute();
- }
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
- public boolean hasUpdate() {
- return this.updated;
- }
+ public boolean hasUpdate() {
+ return this.updated;
+ }
- public boolean hasMessage() {
- return this.hasMessage;
- }
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
- public int getNumOutEdges() {
- return destEdgeList.size();
- }
+ public int getNumOutEdges() {
+ return destEdgeList.size();
+ }
- @SuppressWarnings("unchecked")
- public void sortEdges() {
- Collections.sort((List) destEdgeList);
- }
+ @SuppressWarnings("unchecked")
+ public void sortEdges() {
+ Collections.sort((List) destEdgeList);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
similarity index 93%
rename from pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
rename to pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index ec84fc1..7267f30 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/delegate/VertexDelegate.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.pregelix.api.delegate;
+package edu.uci.ics.pregelix.api.graph;
import java.io.DataOutput;
import java.io.IOException;
@@ -25,12 +25,10 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.pregelix.api.graph.MsgList;
-import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
@SuppressWarnings("rawtypes")
-public class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
+class VertexDelegate<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
/** Vertex id */
private I vertexId = null;
/** Vertex value */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
index ccf19ee..1468431 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultMessageCombiner.java
@@ -19,12 +19,12 @@
}
@Override
- public void step(I vertexIndex, M msg) throws HyracksDataException {
+ public void stepPartial(I vertexIndex, M msg) throws HyracksDataException {
msgList.addElement(msg);
}
@Override
- public void step(MsgList partialAggregate) throws HyracksDataException {
+ public void stepFinal(I vertexIndex, MsgList partialAggregate) throws HyracksDataException {
msgList.addAllElements(partialAggregate);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
index 775c3ed..7c2d8c8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ConnectedComponentsVertex.java
@@ -50,7 +50,7 @@
private MsgList<VLongWritable> msgList;
@Override
- public void step(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
+ public void stepPartial(VLongWritable vertexIndex, VLongWritable msg) throws HyracksDataException {
long value = msg.get();
if (min > value)
min = value;
@@ -64,7 +64,7 @@
}
@Override
- public void step(VLongWritable partialAggregate) throws HyracksDataException {
+ public void stepFinal(VLongWritable vertexIndex, VLongWritable partialAggregate) throws HyracksDataException {
if (min > partialAggregate.get())
min = partialAggregate.get();
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 7e21245..3593a04 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -71,7 +71,7 @@
}
@Override
- public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+ public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
sum += msg.get();
}
@@ -82,7 +82,7 @@
}
@Override
- public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+ public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
sum += partialAggregate.get();
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
index 8c9c1ef..0396beb 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachibilityVertex.java
@@ -60,13 +60,13 @@
}
@Override
- public void step(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
+ public void stepPartial(VLongWritable vertexIndex, ByteWritable msg) throws HyracksDataException {
int newState = agg.get() | msg.get();
agg.set((byte) newState);
}
@Override
- public void step(ByteWritable partialAggregate) throws HyracksDataException {
+ public void stepFinal(VLongWritable vertexIndex, ByteWritable partialAggregate) throws HyracksDataException {
int newState = agg.get() | partialAggregate.get();
agg.set((byte) newState);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
index 89ea951..0a535a6 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ShortestPathsVertex.java
@@ -46,7 +46,7 @@
private MsgList<DoubleWritable> msgList;
@Override
- public void step(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
+ public void stepPartial(VLongWritable vertexIndex, DoubleWritable msg) throws HyracksDataException {
double value = msg.get();
if (min > value)
min = value;
@@ -66,7 +66,7 @@
}
@Override
- public void step(DoubleWritable partialAggregate) throws HyracksDataException {
+ public void stepFinal(VLongWritable vertexIndex, DoubleWritable partialAggregate) throws HyracksDataException {
double value = partialAggregate.get();
if (min > value)
min = value;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 6962aa5..1813dcc 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -93,9 +93,9 @@
}
value.readFields(valueInput);
if (!partialAggAsInput) {
- combiner.step(key, value);
+ combiner.stepPartial(key, value);
} else {
- combiner.step(value);
+ combiner.stepFinal(key, value);
}
} catch (IOException e) {
throw new HyracksDataException(e);