fix the halt-activate issue reported by Anbang
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3269 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index cd49184..e51d4bc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,525 +49,524 @@
*/
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
- private static long superstep = 0;
- /** Class-wide number of vertices */
- private static long numVertices = -1;
- /** Class-wide number of edges */
- private static long numEdges = -1;
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** Map of destination vertices and their edge values */
- private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
- /** If true, do not do anymore computation on this vertex. */
- boolean halt = false;
- /** List of incoming messages from the previous superstep */
- private final List<M> msgList = new ArrayList<M>();
- /** map context */
- private static TaskAttemptContext context = null;
- /** a delegate for hyracks stuff */
- private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
- this);
- /** this vertex is updated or not */
- private boolean updated = false;
- /** has outgoing messages */
- private boolean hasMessage = false;
- /** created new vertex */
- private boolean createdNewLiveVertex = false;
+ implements Writable {
+ private static long superstep = 0;
+ /** Class-wide number of vertices */
+ private static long numVertices = -1;
+ /** Class-wide number of edges */
+ private static long numEdges = -1;
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** Map of destination vertices and their edge values */
+ private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+ /** map context */
+ private static TaskAttemptContext context = null;
+ /** a delegate for hyracks stuff */
+ private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+ /** this vertex is updated or not */
+ private boolean updated = false;
+ /** has outgoing messages */
+ private boolean hasMessage = false;
+ /** created new vertex */
+ private boolean createdNewLiveVertex = false;
- /**
- * use object pool for re-using objects
- */
- private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
- private List<M> msgPool = new ArrayList<M>();
- private List<V> valuePool = new ArrayList<V>();
- private int usedEdge = 0;
- private int usedMessage = 0;
- private int usedValue = 0;
+ /**
+ * use object pool for re-using objects
+ */
+ private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+ private List<M> msgPool = new ArrayList<M>();
+ private List<V> valuePool = new ArrayList<V>();
+ private int usedEdge = 0;
+ private int usedMessage = 0;
+ private int usedValue = 0;
- /**
- * The key method that users need to implement
- *
- * @param msgIterator
- * an iterator of incoming messages
- */
- public abstract void compute(Iterator<M> msgIterator);
+ /**
+ * The key method that users need to implement
+ *
+ * @param msgIterator
+ * an iterator of incoming messages
+ */
+ public abstract void compute(Iterator<M> msgIterator);
- /**
- * Add an edge for the vertex.
- *
- * @param targetVertexId
- * @param edgeValue
- * @return successful or not
- */
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- Edge<I, E> edge = this.allocateEdge();
- edge.setDestVertexId(targetVertexId);
- edge.setEdgeValue(edgeValue);
- destEdgeList.add(edge);
- updated = true;
- return true;
- }
+ /**
+ * Add an edge for the vertex.
+ *
+ * @param targetVertexId
+ * @param edgeValue
+ * @return successful or not
+ */
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ Edge<I, E> edge = this.allocateEdge();
+ edge.setDestVertexId(targetVertexId);
+ edge.setEdgeValue(edgeValue);
+ destEdgeList.add(edge);
+ updated = true;
+ return true;
+ }
- /**
- * Initialize a new vertex
- *
- * @param vertexId
- * @param vertexValue
- * @param edges
- * @param messages
- */
- public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
- List<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- destEdgeList.clear();
- if (edges != null && !edges.isEmpty()) {
- for (Map.Entry<I, E> entry : edges.entrySet()) {
- destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
- .getValue()));
- }
- }
- if (messages != null && !messages.isEmpty()) {
- msgList.addAll(messages);
- }
- }
+ /**
+ * Initialize a new vertex
+ *
+ * @param vertexId
+ * @param vertexValue
+ * @param edges
+ * @param messages
+ */
+ public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ destEdgeList.clear();
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+ }
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
- /**
- * reset a vertex object: clear its internal states
- */
- public void reset() {
- usedEdge = 0;
- usedMessage = 0;
- usedValue = 0;
- updated = false;
- }
+ /**
+ * reset a vertex object: clear its internal states
+ */
+ public void reset() {
+ usedEdge = 0;
+ usedMessage = 0;
+ usedValue = 0;
+ updated = false;
+ }
- /**
- * Set the vertex id
- *
- * @param vertexId
- */
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- delegate.setVertexId(vertexId);
- }
+ /**
+ * Set the vertex id
+ *
+ * @param vertexId
+ */
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ delegate.setVertexId(vertexId);
+ }
- /**
- * Get the vertex id
- *
- * @return vertex id
- */
- public final I getVertexId() {
- return vertexId;
- }
+ /**
+ * Get the vertex id
+ *
+ * @return vertex id
+ */
+ public final I getVertexId() {
+ return vertexId;
+ }
- /**
- * Get the vertex value
- *
- * @return the vertex value
- */
- public final V getVertexValue() {
- return vertexValue;
- }
+ /**
+ * Get the vertex value
+ *
+ * @return the vertex value
+ */
+ public final V getVertexValue() {
+ return vertexValue;
+ }
- /**
- * Set the vertex value
- *
- * @param vertexValue
- */
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- this.updated = true;
- }
+ /**
+ * Set the vertex value
+ *
+ * @param vertexValue
+ */
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ this.updated = true;
+ }
- /***
- * Send a message to a specific vertex
- *
- * @param id
- * the receiver vertex id
- * @param msg
- * the message
- */
- public final void sendMsg(I id, M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsg: Cannot send null message to " + id);
- }
- delegate.sendMsg(id, msg);
- this.hasMessage = true;
- }
+ /***
+ * Send a message to a specific vertex
+ *
+ * @param id
+ * the receiver vertex id
+ * @param msg
+ * the message
+ */
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+ delegate.sendMsg(id, msg);
+ this.hasMessage = true;
+ }
- /**
- * Send a message to all direct outgoing neighbors
- *
- * @param msg
- * the message
- */
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (Edge<I, E> edge : destEdgeList) {
- sendMsg(edge.getDestVertexId(), msg);
- }
- }
+ /**
+ * Send a message to all direct outgoing neighbors
+ *
+ * @param msg
+ * the message
+ */
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeList) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
- /**
- * Vote to halt. Once all vertex vote to halt and no more messages, a
- * Pregelix job will terminate.
- */
- public final void voteToHalt() {
- halt = true;
- updated = true;
- }
+ /**
+ * Vote to halt. Once all vertex vote to halt and no more messages, a
+ * Pregelix job will terminate.
+ */
+ public final void voteToHalt() {
+ halt = true;
+ updated = true;
+ }
- /**
- * @return the vertex is halted (true) or not (false)
- */
- public final boolean isHalted() {
- return halt;
- }
+ /**
+ * Activate a halted vertex such that it is alive again.
+ */
+ public final void activate() {
+ halt = false;
+ updated = true;
+ }
- @Override
- final public void readFields(DataInput in) throws IOException {
- reset();
- if (vertexId == null)
- vertexId = BspUtils.<I> createVertexIndex(getContext()
- .getConfiguration());
- vertexId.readFields(in);
- delegate.setVertexId(vertexId);
- boolean hasVertexValue = in.readBoolean();
+ /**
+ * @return the vertex is halted (true) or not (false)
+ */
+ public final boolean isHalted() {
+ return halt;
+ }
- if (hasVertexValue) {
- vertexValue = allocateValue();
- vertexValue.readFields(in);
- delegate.setVertex(this);
- }
- destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < edgeMapSize; ++i) {
- Edge<I, E> edge = allocateEdge();
- edge.setConf(getContext().getConfiguration());
- edge.readFields(in);
- addEdge(edge);
- }
- msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < msgListSize; ++i) {
- M msg = allocateMessage();
- msg.readFields(in);
- msgList.add(msg);
- }
- halt = in.readBoolean();
- updated = false;
- hasMessage = false;
- createdNewLiveVertex = false;
- }
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ reset();
+ if (vertexId == null)
+ vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+ vertexId.readFields(in);
+ delegate.setVertexId(vertexId);
+ boolean hasVertexValue = in.readBoolean();
- @Override
- public void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- SerDeUtils.writeVLong(out, destEdgeList.size());
- for (Edge<I, E> edge : destEdgeList) {
- edge.write(out);
- }
- SerDeUtils.writeVLong(out, msgList.size());
- for (M msg : msgList) {
- msg.write(out);
- }
- out.writeBoolean(halt);
- }
+ if (hasVertexValue) {
+ vertexValue = allocateValue();
+ vertexValue.readFields(in);
+ delegate.setVertex(this);
+ }
+ destEdgeList.clear();
+ long edgeMapSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = allocateEdge();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(in);
+ addEdge(edge);
+ }
+ msgList.clear();
+ long msgListSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = allocateMessage();
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ updated = false;
+ hasMessage = false;
+ createdNewLiveVertex = false;
+ }
- /**
- * Get the list of incoming messages
- *
- * @return the list of messages
- */
- public List<M> getMsgList() {
- return msgList;
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ SerDeUtils.writeVLong(out, destEdgeList.size());
+ for (Edge<I, E> edge : destEdgeList) {
+ edge.write(out);
+ }
+ SerDeUtils.writeVLong(out, msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
- /**
- * Get outgoing edge list
- *
- * @return a list of outgoing edges
- */
- public List<Edge<I, E>> getEdges() {
- return this.destEdgeList;
- }
+ /**
+ * Get the list of incoming messages
+ *
+ * @return the list of messages
+ */
+ public List<M> getMsgList() {
+ return msgList;
+ }
- @Override
- @SuppressWarnings("unchecked")
- public String toString() {
- Collections.sort(destEdgeList);
- StringBuffer edgeBuffer = new StringBuffer();
- edgeBuffer.append("(");
- for (Edge<I, E> edge : destEdgeList) {
- edgeBuffer.append(edge.getDestVertexId()).append(",");
- }
- edgeBuffer.append(")");
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
- + ", edges=" + edgeBuffer + ")";
- }
+ /**
+ * Get outgoing edge list
+ *
+ * @return a list of outgoing edges
+ */
+ public List<Edge<I, E>> getEdges() {
+ return this.destEdgeList;
+ }
- /**
- * Get the number of outgoing edges
- *
- * @return the number of outging edges
- */
- public int getNumOutEdges() {
- return destEdgeList.size();
- }
+ @Override
+ @SuppressWarnings("unchecked")
+ public String toString() {
+ Collections.sort(destEdgeList);
+ StringBuffer edgeBuffer = new StringBuffer();
+ edgeBuffer.append("(");
+ for (Edge<I, E> edge : destEdgeList) {
+ edgeBuffer.append(edge.getDestVertexId()).append(",");
+ }
+ edgeBuffer.append(")");
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputWriters(List<IFrameWriter> writers) {
- delegate.setOutputWriters(writers);
- }
+ /**
+ * Get the number of outgoing edges
+ *
+ * @return the number of outging edges
+ */
+ public int getNumOutEdges() {
+ return destEdgeList.size();
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputAppenders(List<FrameTupleAppender> appenders) {
- delegate.setOutputAppenders(appenders);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
- delegate.setOutputTupleBuilders(tbs);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void finishCompute() throws IOException {
- delegate.finishCompute();
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
- /**
- * Pregelix internal use only
- */
- public boolean hasUpdate() {
- return this.updated;
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
- /**
- * Pregelix internal use only
- */
- public boolean hasMessage() {
- return this.hasMessage;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasUpdate() {
+ return this.updated;
+ }
- /**
- * Pregelix internal use only
- */
- public boolean createdNewLiveVertex() {
- return this.createdNewLiveVertex;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
- /**
- * sort the edges
- */
- @SuppressWarnings("unchecked")
- public void sortEdges() {
- updated = true;
- Collections.sort(destEdgeList);
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean createdNewLiveVertex() {
+ return this.createdNewLiveVertex;
+ }
- /**
- * Allocate a new edge from the edge pool
- */
- private Edge<I, E> allocateEdge() {
- Edge<I, E> edge;
- if (usedEdge < edgePool.size()) {
- edge = edgePool.get(usedEdge);
- usedEdge++;
- } else {
- edge = new Edge<I, E>();
- edgePool.add(edge);
- usedEdge++;
- }
- return edge;
- }
+ /**
+ * sort the edges
+ */
+ @SuppressWarnings("unchecked")
+ public void sortEdges() {
+ updated = true;
+ Collections.sort(destEdgeList);
+ }
- /**
- * Allocate a new message from the message pool
- */
- private M allocateMessage() {
- M message;
- if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
- usedMessage++;
- } else {
- message = BspUtils.<M> createMessageValue(getContext()
- .getConfiguration());
- msgPool.add(message);
- usedMessage++;
- }
- return message;
- }
+ /**
+ * Allocate a new edge from the edge pool
+ */
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
- /**
- * Set the global superstep for all the vertices (internal use)
- *
- * @param superstep
- * New superstep
- */
- public static final void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
+ /**
+ * Allocate a new message from the message pool
+ */
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
- /**
- * Add an outgoing edge into the vertex
- *
- * @param edge
- * the edge to be added
- * @return true if the edge list changed as a result of this call
- */
- public boolean addEdge(Edge<I, E> edge) {
- edge.setConf(getContext().getConfiguration());
- updated = true;
- return destEdgeList.add(edge);
- }
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static final void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
- /**
- * remove an outgoing edge in the graph
- *
- * @param edge
- * the edge to be removed
- * @return true if the edge is in the edge list of the vertex
- */
- public boolean removeEdge(Edge<I, E> edge) {
- updated = true;
- return destEdgeList.remove(edge);
- }
+ /**
+ * Add an outgoing edge into the vertex
+ *
+ * @param edge
+ * the edge to be added
+ * @return true if the edge list changed as a result of this call
+ */
+ public boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ updated = true;
+ return destEdgeList.add(edge);
+ }
- /**
- * Add a new vertex into the graph
- *
- * @param vertexId
- * the vertex id
- * @param vertex
- * the vertex
- */
- public final void addVertex(I vertexId, Vertex vertex) {
- createdNewLiveVertex |= !vertex.isHalted();
- delegate.addVertex(vertexId, vertex);
- }
+ /**
+ * remove an outgoing edge in the graph
+ *
+ * @param edge
+ * the edge to be removed
+ * @return true if the edge is in the edge list of the vertex
+ */
+ public boolean removeEdge(Edge<I, E> edge) {
+ updated = true;
+ return destEdgeList.remove(edge);
+ }
- /**
- * Delete a vertex from id
- *
- * @param vertexId
- * the vertex id
- */
- public final void deleteVertex(I vertexId) {
- delegate.deleteVertex(vertexId);
- }
+ /**
+ * Add a new vertex into the graph
+ *
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
+ */
+ public final void addVertex(I vertexId, Vertex vertex) {
+ createdNewLiveVertex |= !vertex.isHalted();
+ delegate.addVertex(vertexId, vertex);
+ }
- /**
- * Allocate a vertex value from the object pool
- *
- * @return a vertex value instance
- */
- private V allocateValue() {
- V value;
- if (usedValue < valuePool.size()) {
- value = valuePool.get(usedValue);
- usedValue++;
- } else {
- value = BspUtils.<V> createVertexValue(getContext()
- .getConfiguration());
- valuePool.add(value);
- usedValue++;
- }
- return value;
- }
+ /**
+ * Delete a vertex from id
+ *
+ * @param vertexId
+ * the vertex id
+ */
+ public final void deleteVertex(I vertexId) {
+ delegate.deleteVertex(vertexId);
+ }
- /**
- * Get the current global superstep number
- *
- * @return the current superstep number
- */
- public static final long getSuperstep() {
- return superstep;
- }
+ /**
+ * Allocate a vertex value from the object pool
+ *
+ * @return a vertex value instance
+ */
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedValue);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices
- * Aggregate vertices in the last superstep
- */
- public static final void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
+ /**
+ * Get the current global superstep number
+ *
+ * @return the current superstep number
+ */
+ public static final long getSuperstep() {
+ return superstep;
+ }
- /**
- * Get the number of vertexes in the graph
- *
- * @return the number of vertexes in the graph
- */
- public static final long getNumVertices() {
- return numVertices;
- }
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static final void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges
- * Aggregate edges in the last superstep
- */
- public static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
+ /**
+ * Get the number of vertexes in the graph
+ *
+ * @return the number of vertexes in the graph
+ */
+ public static final long getNumVertices() {
+ return numVertices;
+ }
- /**
- * Get the number of edges from this graph
- *
- * @return the number of edges in the graph
- */
- public static final long getNumEdges() {
- return numEdges;
- }
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
- /**
- * Pregelix internal use only
- */
- public static final TaskAttemptContext getContext() {
- return context;
- }
+ /**
+ * Get the number of edges from this graph
+ *
+ * @return the number of edges in the graph
+ */
+ public static final long getNumEdges() {
+ return numEdges;
+ }
- /**
- * Pregelix internal use only
- *
- * @param context
- */
- public static final void setContext(TaskAttemptContext context) {
- Vertex.context = context;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public static final TaskAttemptContext getContext() {
+ return context;
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param context
+ */
+ public static final void setContext(TaskAttemptContext context) {
+ Vertex.context = context;
+ }
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f7958d9..af95064 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -44,229 +44,249 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
- private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
- public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
- this.confFactory = confFactory;
- }
+ public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
- @Override
- public IUpdateFunction createFunction() {
- return new IUpdateFunction() {
- // for writing intermediate data
- private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
- private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
- private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
+ 1);
+ private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
+ 1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
- // for writing out to message channel
- private IFrameWriter writerMsg;
- private FrameTupleAppender appenderMsg;
- private ByteBuffer bufferMsg;
+ // for writing out to message channel
+ private IFrameWriter writerMsg;
+ private FrameTupleAppender appenderMsg;
+ private ByteBuffer bufferMsg;
- // for writing out to alive message channel
- private IFrameWriter writerAlive;
- private FrameTupleAppender appenderAlive;
- private ByteBuffer bufferAlive;
- private boolean pushAlive;
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+ private boolean pushAlive;
- // for writing out termination detection control channel
- private IFrameWriter writerTerminate;
- private FrameTupleAppender appenderTerminate;
- private ByteBuffer bufferTerminate;
- private boolean terminate = true;
+ // for writing out termination detection control channel
+ private IFrameWriter writerTerminate;
+ private FrameTupleAppender appenderTerminate;
+ private ByteBuffer bufferTerminate;
+ private boolean terminate = true;
- // for writing out termination detection control channel
- private IFrameWriter writerGlobalAggregate;
- private FrameTupleAppender appenderGlobalAggregate;
- private ByteBuffer bufferGlobalAggregate;
- private GlobalAggregator aggregator;
+ // for writing out termination detection control channel
+ private IFrameWriter writerGlobalAggregate;
+ private FrameTupleAppender appenderGlobalAggregate;
+ private ByteBuffer bufferGlobalAggregate;
+ private GlobalAggregator aggregator;
- // for writing out to insert vertex channel
- private IFrameWriter writerInsert;
- private FrameTupleAppender appenderInsert;
- private ByteBuffer bufferInsert;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
- // for writing out to delete vertex channel
- private IFrameWriter writerDelete;
- private FrameTupleAppender appenderDelete;
- private ByteBuffer bufferDelete;
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
- private Vertex vertex;
- private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
- private DataOutput output = new DataOutputStream(bbos);
+ private Vertex vertex;
+ private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+ private DataOutput output = new DataOutputStream(bbos);
- private ArrayIterator msgIterator = new ArrayIterator();
- private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
- private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
- private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
- private Configuration conf;
- private boolean dynamicStateLength;
+ private ArrayIterator msgIterator = new ArrayIterator();
+ private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+ private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+ private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
+ private boolean dynamicStateLength;
- @Override
- public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
- throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
- this.aggregator = BspUtils.createGlobalAggregator(conf);
- this.aggregator.init();
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
+ IFrameWriter... writers) throws HyracksDataException {
+ this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils
+ .getDynamicVertexValueSize(conf);
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
+ this.aggregator.init();
- this.writerMsg = writers[0];
- this.bufferMsg = ctx.allocateFrame();
- this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderMsg.reset(bufferMsg, true);
- this.writers.add(writerMsg);
- this.appenders.add(appenderMsg);
+ this.writerMsg = writers[0];
+ this.bufferMsg = ctx.allocateFrame();
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg.reset(bufferMsg, true);
+ this.writers.add(writerMsg);
+ this.appenders.add(appenderMsg);
- this.writerTerminate = writers[1];
- this.bufferTerminate = ctx.allocateFrame();
- this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderTerminate.reset(bufferTerminate, true);
+ this.writerTerminate = writers[1];
+ this.bufferTerminate = ctx.allocateFrame();
+ this.appenderTerminate = new FrameTupleAppender(
+ ctx.getFrameSize());
+ this.appenderTerminate.reset(bufferTerminate, true);
- this.writerGlobalAggregate = writers[2];
- this.bufferGlobalAggregate = ctx.allocateFrame();
- this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ this.writerGlobalAggregate = writers[2];
+ this.bufferGlobalAggregate = ctx.allocateFrame();
+ this.appenderGlobalAggregate = new FrameTupleAppender(
+ ctx.getFrameSize());
+ this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- this.writerInsert = writers[3];
- this.bufferInsert = ctx.allocateFrame();
- this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderInsert.reset(bufferInsert, true);
- this.writers.add(writerInsert);
- this.appenders.add(appenderInsert);
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
- this.writerDelete = writers[4];
- this.bufferDelete = ctx.allocateFrame();
- this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderDelete.reset(bufferDelete, true);
- this.writers.add(writerDelete);
- this.appenders.add(appenderDelete);
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
- if (writers.length > 5) {
- this.writerAlive = writers[5];
- this.bufferAlive = ctx.allocateFrame();
- this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderAlive.reset(bufferAlive, true);
- this.pushAlive = true;
- this.writers.add(writerAlive);
- this.appenders.add(appenderAlive);
- }
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(
+ ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ this.pushAlive = true;
+ this.writers.add(writerAlive);
+ this.appenders.add(appenderAlive);
+ }
- tbs.add(tbMsg);
- tbs.add(tbInsert);
- tbs.add(tbDelete);
- tbs.add(tbAlive);
- }
+ tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
+ tbs.add(tbAlive);
+ }
- @Override
- public void process(Object[] tuple) throws HyracksDataException {
- // vertex Id, msg content List, vertex Id, vertex
- tbMsg.reset();
- tbAlive.reset();
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ // vertex Id, msg content List, vertex Id, vertex
+ tbMsg.reset();
+ tbAlive.reset();
- vertex = (Vertex) tuple[3];
- vertex.setOutputWriters(writers);
- vertex.setOutputAppenders(appenders);
- vertex.setOutputTupleBuilders(tbs);
+ vertex = (Vertex) tuple[3];
+ vertex.setOutputWriters(writers);
+ vertex.setOutputAppenders(appenders);
+ vertex.setOutputTupleBuilders(tbs);
- ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
- msgContentList.reset(msgIterator);
+ ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+ msgContentList.reset(msgIterator);
- if (!msgIterator.hasNext() && vertex.isHalted())
- return;
+ if (!msgIterator.hasNext() && vertex.isHalted()) {
+ return;
+ }
+ if (vertex.isHalted()) {
+ vertex.activate();
+ }
- try {
- vertex.compute(msgIterator);
- vertex.finishCompute();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ try {
+ vertex.compute(msgIterator);
+ vertex.finishCompute();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
- /**
- * this partition should not terminate
- */
- if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
- terminate = false;
+ /**
+ * this partition should not terminate
+ */
+ if (terminate
+ && (!vertex.isHalted() || vertex.hasMessage() || vertex
+ .createdNewLiveVertex()))
+ terminate = false;
- aggregator.step(vertex);
- }
+ aggregator.step(vertex);
+ }
- @Override
- public void close() throws HyracksDataException {
- FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
- FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
- FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
- if (pushAlive)
- FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
- if (!terminate) {
- writeOutTerminationState();
- }
+ if (pushAlive)
+ FrameTupleUtils
+ .flushTuplesFinal(appenderAlive, writerAlive);
+ if (!terminate) {
+ writeOutTerminationState();
+ }
- /** write out global aggregate value */
- writeOutGlobalAggregate();
- }
+ /** write out global aggregate value */
+ writeOutGlobalAggregate();
+ }
- private void writeOutGlobalAggregate() throws HyracksDataException {
- try {
- /**
- * get partial aggregate result and flush to the final
- * aggregator
- */
- Writable agg = aggregator.finishPartial();
- agg.write(tbGlobalAggregate.getDataOutput());
- tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutGlobalAggregate() throws HyracksDataException {
+ try {
+ /**
+ * get partial aggregate result and flush to the final
+ * aggregator
+ */
+ Writable agg = aggregator.finishPartial();
+ agg.write(tbGlobalAggregate.getDataOutput());
+ tbGlobalAggregate.addFieldEndOffset();
+ appenderGlobalAggregate.append(
+ tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0,
+ tbGlobalAggregate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
+ writerGlobalAggregate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private void writeOutTerminationState() throws HyracksDataException {
- try {
- tbTerminate.getDataOutput().writeLong(0);
- tbTerminate.addFieldEndOffset();
- appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
- tbTerminate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutTerminationState() throws HyracksDataException {
+ try {
+ tbTerminate.getDataOutput().writeLong(0);
+ tbTerminate.addFieldEndOffset();
+ appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
+ tbTerminate.getByteArray(), 0,
+ tbTerminate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderTerminate,
+ writerTerminate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
- try {
- if (vertex != null && vertex.hasUpdate()) {
- if (!dynamicStateLength) {
- // in-place update
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
- }
- } else {
- // write the vertex id
- DataOutput tbOutput = cloneUpdateTb.getDataOutput();
- vertex.getVertexId().write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
+ @Override
+ public void update(ITupleReference tupleRef,
+ ArrayTupleBuilder cloneUpdateTb)
+ throws HyracksDataException {
+ try {
+ if (vertex != null && vertex.hasUpdate()) {
+ if (!dynamicStateLength) {
+ // in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ // write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
- // write the vertex value
- vertex.write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
- }
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
+ // write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 0cf64a0..a241c9c 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -43,234 +43,255 @@
import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
- private static final long serialVersionUID = 1L;
- private final IConfigurationFactory confFactory;
+public class StartComputeUpdateFunctionFactory implements
+ IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
- public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
- this.confFactory = confFactory;
- }
+ public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+ this.confFactory = confFactory;
+ }
- @Override
- public IUpdateFunction createFunction() {
- return new IUpdateFunction() {
- // for writing intermediate data
- private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
- private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
- private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
- private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
+ 1);
+ private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
+ 1);
+ private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+ private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
- // for writing out to message channel
- private IFrameWriter writerMsg;
- private FrameTupleAppender appenderMsg;
- private ByteBuffer bufferMsg;
+ // for writing out to message channel
+ private IFrameWriter writerMsg;
+ private FrameTupleAppender appenderMsg;
+ private ByteBuffer bufferMsg;
- // for writing out to alive message channel
- private IFrameWriter writerAlive;
- private FrameTupleAppender appenderAlive;
- private ByteBuffer bufferAlive;
- private boolean pushAlive;
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+ private boolean pushAlive;
- // for writing out termination detection control channel
- private IFrameWriter writerGlobalAggregate;
- private FrameTupleAppender appenderGlobalAggregate;
- private ByteBuffer bufferGlobalAggregate;
- private GlobalAggregator aggregator;
+ // for writing out termination detection control channel
+ private IFrameWriter writerGlobalAggregate;
+ private FrameTupleAppender appenderGlobalAggregate;
+ private ByteBuffer bufferGlobalAggregate;
+ private GlobalAggregator aggregator;
- // for writing out the global aggregate
- private IFrameWriter writerTerminate;
- private FrameTupleAppender appenderTerminate;
- private ByteBuffer bufferTerminate;
- private boolean terminate = true;
+ // for writing out the global aggregate
+ private IFrameWriter writerTerminate;
+ private FrameTupleAppender appenderTerminate;
+ private ByteBuffer bufferTerminate;
+ private boolean terminate = true;
- // for writing out to insert vertex channel
- private IFrameWriter writerInsert;
- private FrameTupleAppender appenderInsert;
- private ByteBuffer bufferInsert;
+ // for writing out to insert vertex channel
+ private IFrameWriter writerInsert;
+ private FrameTupleAppender appenderInsert;
+ private ByteBuffer bufferInsert;
- // for writing out to delete vertex channel
- private IFrameWriter writerDelete;
- private FrameTupleAppender appenderDelete;
- private ByteBuffer bufferDelete;
+ // for writing out to delete vertex channel
+ private IFrameWriter writerDelete;
+ private FrameTupleAppender appenderDelete;
+ private ByteBuffer bufferDelete;
- // dummy empty msgList
- private MsgList msgList = new MsgList();
- private ArrayIterator msgIterator = new ArrayIterator();
+ // dummy empty msgList
+ private MsgList msgList = new MsgList();
+ private ArrayIterator msgIterator = new ArrayIterator();
- private Vertex vertex;
- private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
- private DataOutput output = new DataOutputStream(bbos);
+ private Vertex vertex;
+ private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+ private DataOutput output = new DataOutputStream(bbos);
- private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
- private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
- private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
- private Configuration conf;
- private boolean dynamicStateLength;
+ private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+ private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+ private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
+ private boolean dynamicStateLength;
- @Override
- public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
- throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
- this.aggregator = BspUtils.createGlobalAggregator(conf);
- this.aggregator.init();
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
+ IFrameWriter... writers) throws HyracksDataException {
+ this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils
+ .getDynamicVertexValueSize(conf);
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
+ this.aggregator.init();
- this.writerMsg = writers[0];
- this.bufferMsg = ctx.allocateFrame();
- this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderMsg.reset(bufferMsg, true);
- this.writers.add(writerMsg);
- this.appenders.add(appenderMsg);
+ this.writerMsg = writers[0];
+ this.bufferMsg = ctx.allocateFrame();
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg.reset(bufferMsg, true);
+ this.writers.add(writerMsg);
+ this.appenders.add(appenderMsg);
- this.writerTerminate = writers[1];
- this.bufferTerminate = ctx.allocateFrame();
- this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderTerminate.reset(bufferTerminate, true);
+ this.writerTerminate = writers[1];
+ this.bufferTerminate = ctx.allocateFrame();
+ this.appenderTerminate = new FrameTupleAppender(
+ ctx.getFrameSize());
+ this.appenderTerminate.reset(bufferTerminate, true);
- this.writerGlobalAggregate = writers[2];
- this.bufferGlobalAggregate = ctx.allocateFrame();
- this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+ this.writerGlobalAggregate = writers[2];
+ this.bufferGlobalAggregate = ctx.allocateFrame();
+ this.appenderGlobalAggregate = new FrameTupleAppender(
+ ctx.getFrameSize());
+ this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
- this.writerInsert = writers[3];
- this.bufferInsert = ctx.allocateFrame();
- this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderInsert.reset(bufferInsert, true);
- this.writers.add(writerInsert);
- this.appenders.add(appenderInsert);
+ this.writerInsert = writers[3];
+ this.bufferInsert = ctx.allocateFrame();
+ this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderInsert.reset(bufferInsert, true);
+ this.writers.add(writerInsert);
+ this.appenders.add(appenderInsert);
- this.writerDelete = writers[4];
- this.bufferDelete = ctx.allocateFrame();
- this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderDelete.reset(bufferDelete, true);
- this.writers.add(writerDelete);
- this.appenders.add(appenderDelete);
+ this.writerDelete = writers[4];
+ this.bufferDelete = ctx.allocateFrame();
+ this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderDelete.reset(bufferDelete, true);
+ this.writers.add(writerDelete);
+ this.appenders.add(appenderDelete);
- if (writers.length > 5) {
- this.writerAlive = writers[5];
- this.bufferAlive = ctx.allocateFrame();
- this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
- this.appenderAlive.reset(bufferAlive, true);
- this.pushAlive = true;
- this.writers.add(writerAlive);
- this.appenders.add(appenderAlive);
- }
- msgList.reset(msgIterator);
+ if (writers.length > 5) {
+ this.writerAlive = writers[5];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(
+ ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ this.pushAlive = true;
+ this.writers.add(writerAlive);
+ this.appenders.add(appenderAlive);
+ }
+ msgList.reset(msgIterator);
- tbs.add(tbMsg);
- tbs.add(tbInsert);
- tbs.add(tbDelete);
- tbs.add(tbAlive);
- }
+ tbs.add(tbMsg);
+ tbs.add(tbInsert);
+ tbs.add(tbDelete);
+ tbs.add(tbAlive);
+ }
- @Override
- public void process(Object[] tuple) throws HyracksDataException {
- // vertex Id, vertex
- tbMsg.reset();
- tbAlive.reset();
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ // vertex Id, vertex
+ tbMsg.reset();
+ tbAlive.reset();
- vertex = (Vertex) tuple[1];
- vertex.setOutputWriters(writers);
- vertex.setOutputAppenders(appenders);
- vertex.setOutputTupleBuilders(tbs);
+ vertex = (Vertex) tuple[1];
+ vertex.setOutputWriters(writers);
+ vertex.setOutputAppenders(appenders);
+ vertex.setOutputTupleBuilders(tbs);
- if (!msgIterator.hasNext() && vertex.isHalted())
- return;
+ if (!msgIterator.hasNext() && vertex.isHalted()) {
+ return;
+ }
+ if (vertex.isHalted()) {
+ vertex.activate();
+ }
- try {
- vertex.compute(msgIterator);
- vertex.finishCompute();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ try {
+ vertex.compute(msgIterator);
+ vertex.finishCompute();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
- /**
- * this partition should not terminate
- */
- if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
- terminate = false;
+ /**
+ * this partition should not terminate
+ */
+ if (terminate
+ && (!vertex.isHalted() || vertex.hasMessage() || vertex
+ .createdNewLiveVertex()))
+ terminate = false;
- /**
- * call the global aggregator
- */
- aggregator.step(vertex);
- }
+ /**
+ * call the global aggregator
+ */
+ aggregator.step(vertex);
+ }
- @Override
- public void close() throws HyracksDataException {
- FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
- FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
- FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+ FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+ FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
- if (pushAlive)
- FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
- if (!terminate) {
- writeOutTerminationState();
- }
+ if (pushAlive)
+ FrameTupleUtils
+ .flushTuplesFinal(appenderAlive, writerAlive);
+ if (!terminate) {
+ writeOutTerminationState();
+ }
- /** write out global aggregate value */
- writeOutGlobalAggregate();
- }
+ /** write out global aggregate value */
+ writeOutGlobalAggregate();
+ }
- private void writeOutGlobalAggregate() throws HyracksDataException {
- try {
- /**
- * get partial aggregate result and flush to the final
- * aggregator
- */
- Writable agg = aggregator.finishPartial();
- agg.write(tbGlobalAggregate.getDataOutput());
- tbGlobalAggregate.addFieldEndOffset();
- appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
- tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutGlobalAggregate() throws HyracksDataException {
+ try {
+ /**
+ * get partial aggregate result and flush to the final
+ * aggregator
+ */
+ Writable agg = aggregator.finishPartial();
+ agg.write(tbGlobalAggregate.getDataOutput());
+ tbGlobalAggregate.addFieldEndOffset();
+ appenderGlobalAggregate.append(
+ tbGlobalAggregate.getFieldEndOffsets(),
+ tbGlobalAggregate.getByteArray(), 0,
+ tbGlobalAggregate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
+ writerGlobalAggregate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private void writeOutTerminationState() throws HyracksDataException {
- try {
- tbTerminate.getDataOutput().writeLong(0);
- tbTerminate.addFieldEndOffset();
- appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
- tbTerminate.getSize());
- FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ private void writeOutTerminationState() throws HyracksDataException {
+ try {
+ tbTerminate.getDataOutput().writeLong(0);
+ tbTerminate.addFieldEndOffset();
+ appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
+ tbTerminate.getByteArray(), 0,
+ tbTerminate.getSize());
+ FrameTupleUtils.flushTuplesFinal(appenderTerminate,
+ writerTerminate);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
- try {
- if (vertex != null && vertex.hasUpdate()) {
- if (!dynamicStateLength) {
- // in-place update
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
- }
- } else {
- // write the vertex id
- DataOutput tbOutput = cloneUpdateTb.getDataOutput();
- vertex.getVertexId().write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
+ @Override
+ public void update(ITupleReference tupleRef,
+ ArrayTupleBuilder cloneUpdateTb)
+ throws HyracksDataException {
+ try {
+ if (vertex != null && vertex.hasUpdate()) {
+ if (!dynamicStateLength) {
+ // in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ // write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
- // write the vertex value
- vertex.write(tbOutput);
- cloneUpdateTb.addFieldEndOffset();
- }
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- };
- }
+ // write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
}