merged from fullstack_asterix_stabilization to fullstack_lsm_staging -r3100:3171
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3173 123451ca-8445-de46-9d55-352943316053
diff --git a/pom.xml b/pom.xml
index 7d08fb7..6e28021 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.13</version>
+ <version>2.12</version>
<configuration>
<forkMode>pertest</forkMode>
<argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n ${jvm.extraargs}</argLine>
@@ -53,16 +53,6 @@
</plugins>
</build>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-changelog-plugin</artifactId>
- <version>2.2</version>
- </plugin>
- </plugins>
- </reporting>
-
<distributionManagement>
<repository>
<id>hyracks-releases</id>
diff --git a/pregelix-api/pom.xml b/pregelix-api/pom.xml
index 8212e1c..10efa59 100644
--- a/pregelix-api/pom.xml
+++ b/pregelix-api/pom.xml
@@ -41,9 +41,10 @@
</configuration>
</plugin>
<plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
- <configuration>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
<filesets>
<fileset>
<directory>.</directory>
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 3a98fd9..cd49184 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,510 +49,525 @@
*/
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
- private static long superstep = 0;
- /** Class-wide number of vertices */
- private static long numVertices = -1;
- /** Class-wide number of edges */
- private static long numEdges = -1;
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** Map of destination vertices and their edge values */
- private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
- /** If true, do not do anymore computation on this vertex. */
- boolean halt = false;
- /** List of incoming messages from the previous superstep */
- private final List<M> msgList = new ArrayList<M>();
- /** map context */
- private static TaskAttemptContext context = null;
- /** a delegate for hyracks stuff */
- private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
- /** this vertex is updated or not */
- private boolean updated = false;
- /** has outgoing messages */
- private boolean hasMessage = false;
- /** created new vertex */
- private boolean createdNewLiveVertex = false;
+ implements Writable {
+ private static long superstep = 0;
+ /** Class-wide number of vertices */
+ private static long numVertices = -1;
+ /** Class-wide number of edges */
+ private static long numEdges = -1;
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** Map of destination vertices and their edge values */
+ private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+ /** map context */
+ private static TaskAttemptContext context = null;
+ /** a delegate for hyracks stuff */
+ private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
+ this);
+ /** this vertex is updated or not */
+ private boolean updated = false;
+ /** has outgoing messages */
+ private boolean hasMessage = false;
+ /** created new vertex */
+ private boolean createdNewLiveVertex = false;
- /**
- * use object pool for re-using objects
- */
- private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
- private List<M> msgPool = new ArrayList<M>();
- private List<V> valuePool = new ArrayList<V>();
- private int usedEdge = 0;
- private int usedMessage = 0;
- private int usedValue = 0;
+ /**
+ * use object pool for re-using objects
+ */
+ private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+ private List<M> msgPool = new ArrayList<M>();
+ private List<V> valuePool = new ArrayList<V>();
+ private int usedEdge = 0;
+ private int usedMessage = 0;
+ private int usedValue = 0;
- /**
- * The key method that users need to implement
- *
- * @param msgIterator
- * an iterator of incoming messages
- */
- public abstract void compute(Iterator<M> msgIterator);
+ /**
+ * The key method that users need to implement
+ *
+ * @param msgIterator
+ * an iterator of incoming messages
+ */
+ public abstract void compute(Iterator<M> msgIterator);
- /**
- * Add an edge for the vertex.
- *
- * @param targetVertexId
- * @param edgeValue
- * @return successful or not
- */
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- Edge<I, E> edge = this.allocateEdge();
- edge.setDestVertexId(targetVertexId);
- edge.setEdgeValue(edgeValue);
- destEdgeList.add(edge);
- return true;
- }
+ /**
+ * Add an edge for the vertex.
+ *
+ * @param targetVertexId
+ * @param edgeValue
+ * @return successful or not
+ */
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ Edge<I, E> edge = this.allocateEdge();
+ edge.setDestVertexId(targetVertexId);
+ edge.setEdgeValue(edgeValue);
+ destEdgeList.add(edge);
+ updated = true;
+ return true;
+ }
- /**
- * Initialize a new vertex
- *
- * @param vertexId
- * @param vertexValue
- * @param edges
- * @param messages
- */
- public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- destEdgeList.clear();
- if (edges != null && !edges.isEmpty()) {
- for (Map.Entry<I, E> entry : edges.entrySet()) {
- destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
- }
- }
- if (messages != null && !messages.isEmpty()) {
- msgList.addAll(messages);
- }
- }
+ /**
+ * Initialize a new vertex
+ *
+ * @param vertexId
+ * @param vertexValue
+ * @param edges
+ * @param messages
+ */
+ public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
+ List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ destEdgeList.clear();
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
+ .getValue()));
+ }
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
- /**
- * reset a vertex object: clear its internal states
- */
- public void reset() {
- usedEdge = 0;
- usedMessage = 0;
- usedValue = 0;
- }
+ /**
+ * reset a vertex object: clear its internal states
+ */
+ public void reset() {
+ usedEdge = 0;
+ usedMessage = 0;
+ usedValue = 0;
+ updated = false;
+ }
- /**
- * Set the vertex id
- *
- * @param vertexId
- */
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- delegate.setVertexId(vertexId);
- }
+ /**
+ * Set the vertex id
+ *
+ * @param vertexId
+ */
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ delegate.setVertexId(vertexId);
+ }
- /**
- * Get the vertex id
- *
- * @return vertex id
- */
- public final I getVertexId() {
- return vertexId;
- }
+ /**
+ * Get the vertex id
+ *
+ * @return vertex id
+ */
+ public final I getVertexId() {
+ return vertexId;
+ }
- /**
- * Get the vertex value
- *
- * @return the vertex value
- */
- public final V getVertexValue() {
- return vertexValue;
- }
+ /**
+ * Get the vertex value
+ *
+ * @return the vertex value
+ */
+ public final V getVertexValue() {
+ return vertexValue;
+ }
- /**
- * Set the vertex value
- *
- * @param vertexValue
- */
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- this.updated = true;
- }
+ /**
+ * Set the vertex value
+ *
+ * @param vertexValue
+ */
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ this.updated = true;
+ }
- /***
- * Send a message to a specific vertex
- *
- * @param id
- * the receiver vertex id
- * @param msg
- * the message
- */
- public final void sendMsg(I id, M msg) {
- if (msg == null) {
- throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
- }
- delegate.sendMsg(id, msg);
- this.hasMessage = true;
- }
+ /***
+ * Send a message to a specific vertex
+ *
+ * @param id
+ * the receiver vertex id
+ * @param msg
+ * the message
+ */
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsg: Cannot send null message to " + id);
+ }
+ delegate.sendMsg(id, msg);
+ this.hasMessage = true;
+ }
- /**
- * Send a message to all direct outgoing neighbors
- *
- * @param msg
- * the message
- */
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (Edge<I, E> edge : destEdgeList) {
- sendMsg(edge.getDestVertexId(), msg);
- }
- }
+ /**
+ * Send a message to all direct outgoing neighbors
+ *
+ * @param msg
+ * the message
+ */
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeList) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
- /**
- * Vote to halt. Once all vertex vote to halt and no more messages, a
- * Pregelix job will terminate.
- */
- public final void voteToHalt() {
- halt = true;
- }
+ /**
+ * Vote to halt. Once all vertex vote to halt and no more messages, a
+ * Pregelix job will terminate.
+ */
+ public final void voteToHalt() {
+ halt = true;
+ updated = true;
+ }
- /**
- * @return the vertex is halted (true) or not (false)
- */
- public final boolean isHalted() {
- return halt;
- }
+ /**
+ * @return the vertex is halted (true) or not (false)
+ */
+ public final boolean isHalted() {
+ return halt;
+ }
- @Override
- final public void readFields(DataInput in) throws IOException {
- reset();
- if (vertexId == null)
- vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
- vertexId.readFields(in);
- delegate.setVertexId(vertexId);
- boolean hasVertexValue = in.readBoolean();
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ reset();
+ if (vertexId == null)
+ vertexId = BspUtils.<I> createVertexIndex(getContext()
+ .getConfiguration());
+ vertexId.readFields(in);
+ delegate.setVertexId(vertexId);
+ boolean hasVertexValue = in.readBoolean();
- if (hasVertexValue) {
- vertexValue = allocateValue();
- vertexValue.readFields(in);
- delegate.setVertex(this);
- }
- destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < edgeMapSize; ++i) {
- Edge<I, E> edge = allocateEdge();
- edge.setConf(getContext().getConfiguration());
- edge.readFields(in);
- addEdge(edge);
- }
- msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < msgListSize; ++i) {
- M msg = allocateMessage();
- msg.readFields(in);
- msgList.add(msg);
- }
- halt = in.readBoolean();
- updated = false;
- hasMessage = false;
- createdNewLiveVertex = false;
- }
+ if (hasVertexValue) {
+ vertexValue = allocateValue();
+ vertexValue.readFields(in);
+ delegate.setVertex(this);
+ }
+ destEdgeList.clear();
+ long edgeMapSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = allocateEdge();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(in);
+ addEdge(edge);
+ }
+ msgList.clear();
+ long msgListSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = allocateMessage();
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ updated = false;
+ hasMessage = false;
+ createdNewLiveVertex = false;
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- SerDeUtils.writeVLong(out, destEdgeList.size());
- for (Edge<I, E> edge : destEdgeList) {
- edge.write(out);
- }
- SerDeUtils.writeVLong(out, msgList.size());
- for (M msg : msgList) {
- msg.write(out);
- }
- out.writeBoolean(halt);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ SerDeUtils.writeVLong(out, destEdgeList.size());
+ for (Edge<I, E> edge : destEdgeList) {
+ edge.write(out);
+ }
+ SerDeUtils.writeVLong(out, msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
- /**
- * Get the list of incoming messages
- *
- * @return the list of messages
- */
- public List<M> getMsgList() {
- return msgList;
- }
+ /**
+ * Get the list of incoming messages
+ *
+ * @return the list of messages
+ */
+ public List<M> getMsgList() {
+ return msgList;
+ }
- /**
- * Get outgoing edge list
- *
- * @return a list of outgoing edges
- */
- public List<Edge<I, E>> getEdges() {
- return this.destEdgeList;
- }
+ /**
+ * Get outgoing edge list
+ *
+ * @return a list of outgoing edges
+ */
+ public List<Edge<I, E>> getEdges() {
+ return this.destEdgeList;
+ }
- @Override
- @SuppressWarnings("unchecked")
- public String toString() {
- Collections.sort(destEdgeList);
- StringBuffer edgeBuffer = new StringBuffer();
- edgeBuffer.append("(");
- for (Edge<I, E> edge : destEdgeList) {
- edgeBuffer.append(edge.getDestVertexId()).append(",");
- }
- edgeBuffer.append(")");
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
- }
+ @Override
+ @SuppressWarnings("unchecked")
+ public String toString() {
+ Collections.sort(destEdgeList);
+ StringBuffer edgeBuffer = new StringBuffer();
+ edgeBuffer.append("(");
+ for (Edge<I, E> edge : destEdgeList) {
+ edgeBuffer.append(edge.getDestVertexId()).append(",");
+ }
+ edgeBuffer.append(")");
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
+ + ", edges=" + edgeBuffer + ")";
+ }
- /**
- * Get the number of outgoing edges
- *
- * @return the number of outging edges
- */
- public int getNumOutEdges() {
- return destEdgeList.size();
- }
+ /**
+ * Get the number of outgoing edges
+ *
+ * @return the number of outging edges
+ */
+ public int getNumOutEdges() {
+ return destEdgeList.size();
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputWriters(List<IFrameWriter> writers) {
- delegate.setOutputWriters(writers);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputAppenders(List<FrameTupleAppender> appenders) {
- delegate.setOutputAppenders(appenders);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
- delegate.setOutputTupleBuilders(tbs);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void finishCompute() throws IOException {
- delegate.finishCompute();
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
- /**
- * Pregelix internal use only
- */
- public boolean hasUpdate() {
- return this.updated;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasUpdate() {
+ return this.updated;
+ }
- /**
- * Pregelix internal use only
- */
- public boolean hasMessage() {
- return this.hasMessage;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
- /**
- * Pregelix internal use only
- */
- public boolean createdNewLiveVertex() {
- return this.createdNewLiveVertex;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean createdNewLiveVertex() {
+ return this.createdNewLiveVertex;
+ }
- /**
- * sort the edges
- */
- @SuppressWarnings("unchecked")
- public void sortEdges() {
- Collections.sort(destEdgeList);
- }
+ /**
+ * sort the edges
+ */
+ @SuppressWarnings("unchecked")
+ public void sortEdges() {
+ updated = true;
+ Collections.sort(destEdgeList);
+ }
- /**
- * Allocate a new edge from the edge pool
- */
- private Edge<I, E> allocateEdge() {
- Edge<I, E> edge;
- if (usedEdge < edgePool.size()) {
- edge = edgePool.get(usedEdge);
- usedEdge++;
- } else {
- edge = new Edge<I, E>();
- edgePool.add(edge);
- usedEdge++;
- }
- return edge;
- }
+ /**
+ * Allocate a new edge from the edge pool
+ */
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
- /**
- * Allocate a new message from the message pool
- */
- private M allocateMessage() {
- M message;
- if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
- usedMessage++;
- } else {
- message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
- msgPool.add(message);
- usedMessage++;
- }
- return message;
- }
+ /**
+ * Allocate a new message from the message pool
+ */
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext()
+ .getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
- /**
- * Set the global superstep for all the vertices (internal use)
- *
- * @param superstep
- * New superstep
- */
- public static final void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static final void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
- /**
- * Add an outgoing edge into the vertex
- *
- * @param edge
- * the edge to be added
- * @return true if the edge list changed as a result of this call
- */
- public boolean addEdge(Edge<I, E> edge) {
- edge.setConf(getContext().getConfiguration());
- return destEdgeList.add(edge);
- }
+ /**
+ * Add an outgoing edge into the vertex
+ *
+ * @param edge
+ * the edge to be added
+ * @return true if the edge list changed as a result of this call
+ */
+ public boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ updated = true;
+ return destEdgeList.add(edge);
+ }
- /**
- * remove an outgoing edge in the graph
- *
- * @param edge
- * the edge to be removed
- * @return true if the edge is in the edge list of the vertex
- */
- public boolean removeEdge(Edge<I, E> edge) {
- return destEdgeList.remove(edge);
- }
+ /**
+ * remove an outgoing edge in the graph
+ *
+ * @param edge
+ * the edge to be removed
+ * @return true if the edge is in the edge list of the vertex
+ */
+ public boolean removeEdge(Edge<I, E> edge) {
+ updated = true;
+ return destEdgeList.remove(edge);
+ }
- /**
- * Add a new vertex into the graph
- *
- * @param vertexId
- * the vertex id
- * @param vertex
- * the vertex
- */
- public final void addVertex(I vertexId, Vertex vertex) {
- createdNewLiveVertex |= !vertex.isHalted();
- delegate.addVertex(vertexId, vertex);
- }
+ /**
+ * Add a new vertex into the graph
+ *
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
+ */
+ public final void addVertex(I vertexId, Vertex vertex) {
+ createdNewLiveVertex |= !vertex.isHalted();
+ delegate.addVertex(vertexId, vertex);
+ }
- /**
- * Delete a vertex from id
- *
- * @param vertexId
- * the vertex id
- */
- public final void deleteVertex(I vertexId) {
- delegate.deleteVertex(vertexId);
- }
+ /**
+ * Delete a vertex from id
+ *
+ * @param vertexId
+ * the vertex id
+ */
+ public final void deleteVertex(I vertexId) {
+ delegate.deleteVertex(vertexId);
+ }
- /**
- * Allocate a vertex value from the object pool
- *
- * @return a vertex value instance
- */
- private V allocateValue() {
- V value;
- if (usedValue < valuePool.size()) {
- value = valuePool.get(usedValue);
- usedValue++;
- } else {
- value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
- valuePool.add(value);
- usedValue++;
- }
- return value;
- }
+ /**
+ * Allocate a vertex value from the object pool
+ *
+ * @return a vertex value instance
+ */
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedValue);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext()
+ .getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
- /**
- * Get the current global superstep number
- *
- * @return the current superstep number
- */
- public static final long getSuperstep() {
- return superstep;
- }
+ /**
+ * Get the current global superstep number
+ *
+ * @return the current superstep number
+ */
+ public static final long getSuperstep() {
+ return superstep;
+ }
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices
- * Aggregate vertices in the last superstep
- */
- public static final void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static final void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
- /**
- * Get the number of vertexes in the graph
- *
- * @return the number of vertexes in the graph
- */
- public static final long getNumVertices() {
- return numVertices;
- }
+ /**
+ * Get the number of vertexes in the graph
+ *
+ * @return the number of vertexes in the graph
+ */
+ public static final long getNumVertices() {
+ return numVertices;
+ }
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges
- * Aggregate edges in the last superstep
- */
- public static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
- /**
- * Get the number of edges from this graph
- *
- * @return the number of edges in the graph
- */
- public static final long getNumEdges() {
- return numEdges;
- }
+ /**
+ * Get the number of edges from this graph
+ *
+ * @return the number of edges in the graph
+ */
+ public static final long getNumEdges() {
+ return numEdges;
+ }
- /**
- * Pregelix internal use only
- */
- public static final TaskAttemptContext getContext() {
- return context;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public static final TaskAttemptContext getContext() {
+ return context;
+ }
- /**
- * Pregelix internal use only
- *
- * @param context
- */
- public static final void setContext(TaskAttemptContext context) {
- Vertex.context = context;
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param context
+ */
+ public static final void setContext(TaskAttemptContext context) {
+ Vertex.context = context;
+ }
}
diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 4c63452..f084614 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -19,8 +19,9 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
+ <version>2.3.2</version>
<executions>
<execution>
<id>balancer</id>
@@ -73,7 +74,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
- <version>1.3</version>
+ <version>1.3</version>
<executions>
<execution>
<configuration>
@@ -165,7 +166,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index f07a246..d9b267d 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -141,22 +141,11 @@
start = System.currentTimeMillis();
runHDFSWRite(jobGen);
runCleanup(jobGen);
- destroyApplication(applicationName);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
LOG.info("job finished");
} catch (Exception e) {
- try {
- /**
- * destroy application if there is any exception
- */
- if (hcc != null) {
- destroyApplication(applicationName);
- }
- } catch (Exception e2) {
- throw new HyracksException(e2);
- }
throw new HyracksException(e);
}
}
@@ -214,8 +203,8 @@
private void execute(JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc.startJob(applicationName, job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
@@ -230,15 +219,11 @@
LOG.info("jar packing finished " + (end - start) + "ms");
start = System.currentTimeMillis();
- hcc.createApplication(applicationName, appZip);
+ // TODO: Fix this step to use Yarn
+ //hcc.createApplication(applicationName, appZip);
end = System.currentTimeMillis();
LOG.info("jar deployment finished " + (end - start) + "ms");
}
-
- public void destroyApplication(String appName) throws Exception {
- hcc.destroyApplication(appName);
- }
-
}
class FileFilter implements FilenameFilter {
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 419e4b0..fe2fcac 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -262,12 +262,10 @@
*/
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 4,
- deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5,
- btreeBulkLoad, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -487,11 +485,9 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 3,
- insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 4,
- deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 27394e6..f1eceb7 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -241,11 +241,9 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
/**
@@ -449,9 +447,9 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 91a629c..314c393 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -219,26 +219,24 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -408,7 +406,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -417,16 +415,16 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index e653196..0c3db38 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -195,7 +195,7 @@
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/**
* final aggregate write operator
@@ -239,18 +239,16 @@
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
@@ -441,7 +439,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -450,16 +448,16 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 2a2e2bf..d099645 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
public class PregelixHyracksIntegrationUtil {
@@ -64,7 +65,9 @@
ncConfig1.clusterNetIPAddress = "localhost";
ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
+ ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -73,7 +76,9 @@
ncConfig2.clusterNetIPAddress = "localhost";
ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
+ ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -82,14 +87,6 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
- public static void destroyApp(String hyracksAppName) throws Exception {
- hcc.destroyApplication(hyracksAppName);
- }
-
- public static void createApp(String hyracksAppName) throws Exception {
- hcc.createApplication(hyracksAppName, null);
- }
-
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
@@ -98,7 +95,7 @@
public static void runJob(JobSpecification spec, String appName) throws Exception {
spec.setFrameSize(FRAME_SIZE);
- JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
hcc.waitForCompletion(jobId);
}
diff --git a/pregelix-core/src/main/resources/scripts/getip.sh b/pregelix-core/src/main/resources/scripts/getip.sh
index e0cdf73..a691c0f 100755
--- a/pregelix-core/src/main/resources/scripts/getip.sh
+++ b/pregelix-core/src/main/resources/scripts/getip.sh
@@ -6,6 +6,10 @@
then
#Get IP Address
IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ if [ "$IPADDR" = "" ]
+ then
+ IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ fi
if [ "$IPADDR" = "" ]
then
IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
diff --git a/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix-core/src/main/resources/scripts/startcc.sh
index fe2551d..efb79ce 100644
--- a/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,4 @@
#Launch hyracks cc script
chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix-core/src/main/resources/scripts/startnc.sh
index 6e0f90e..b059aad 100644
--- a/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix-core/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
fi
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index a5cb6e2..3c00cad 100644
--- a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -103,7 +103,6 @@
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
PregelixHyracksIntegrationUtil.init();
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
@@ -121,7 +120,6 @@
runIndexRightOuterJoin();
TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
PregelixHyracksIntegrationUtil.deinit();
}
diff --git a/pregelix-dataflow-std-base/pom.xml b/pregelix-dataflow-std-base/pom.xml
index caa3222..4fda45f 100644
--- a/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix-dataflow-std-base/pom.xml
@@ -42,8 +42,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix-dataflow-std/pom.xml b/pregelix-dataflow-std/pom.xml
index c5a0913..efe1607 100644
--- a/pregelix-dataflow-std/pom.xml
+++ b/pregelix-dataflow-std/pom.xml
@@ -43,8 +43,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix-dataflow/pom.xml b/pregelix-dataflow/pom.xml
index d3f396e..2d0859b 100644
--- a/pregelix-dataflow/pom.xml
+++ b/pregelix-dataflow/pom.xml
@@ -43,8 +43,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix-example/pom.xml b/pregelix-example/pom.xml
index e643331..84feb78 100644
--- a/pregelix-example/pom.xml
+++ b/pregelix-example/pom.xml
@@ -24,6 +24,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
@@ -42,7 +43,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
- <version>1.3</version>
+ <version>1.3</version>
<executions>
<execution>
<configuration>
@@ -79,7 +80,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 5773602..e54373f 100644
--- a/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -47,7 +47,7 @@
if (newVertex == null) {
newVertex = new GraphMutationVertex();
}
- if (getVertexId().get() % 2 == 0) {
+ if (getVertexId().get() % 2 == 0 || getVertexId().get() % 3 == 0) {
deleteVertex(getVertexId());
} else {
vid.set(100 * getVertexId().get());
@@ -59,7 +59,7 @@
} else {
if (getVertexId().get() % 190 == 0) {
deleteVertex(getVertexId());
- }
+ }
voteToHalt();
}
}
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index 37f03a5..da6d564 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -77,7 +77,6 @@
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
PregelixHyracksIntegrationUtil.init();
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
LOGGER.info("Hyracks mini-cluster started");
startHDFS();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
@@ -112,7 +111,6 @@
}
public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
PregelixHyracksIntegrationUtil.deinit();
LOGGER.info("Hyracks mini-cluster shut down");
cleanupHDFS();
diff --git a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 4b4cc2e..4bf83e6 100644
--- a/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -74,7 +74,6 @@
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
PregelixHyracksIntegrationUtil.init();
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
LOGGER.info("Hyracks mini-cluster started");
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
@@ -126,7 +125,6 @@
}
public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
PregelixHyracksIntegrationUtil.deinit();
LOGGER.info("Hyracks mini-cluster shut down");
cleanupHDFS();
@@ -169,7 +167,7 @@
public void run(TestResult result) {
try {
int testCount = countTestCases();
- for (int i = 2; i == 2; i++) {
+ for (int i = 0; i < testCount; i++) {
// cleanupStores();
Test each = this.testAt(i);
if (result.shouldStop())
diff --git a/pregelix-example/src/test/resources/expected/GraphMutation.result b/pregelix-example/src/test/resources/expected/GraphMutation.result
index 3f2fd60..a30166c 100644
--- a/pregelix-example/src/test/resources/expected/GraphMutation.result
+++ b/pregelix-example/src/test/resources/expected/GraphMutation.result
@@ -1,19 +1,13 @@
1 0.0
-3 0.0
5 0.0
7 0.0
-9 0.0
11 0.0
13 0.0
-15 0.0
17 0.0
19 0.0
100 0.0
-300 0.0
500 0.0
700 0.0
-900 0.0
1100 0.0
1300 0.0
-1500 0.0
1700 0.0
diff --git a/pregelix-runtime/pom.xml b/pregelix-runtime/pom.xml
index e75f98b..94bda18 100644
--- a/pregelix-runtime/pom.xml
+++ b/pregelix-runtime/pom.xml
@@ -42,8 +42,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
new file mode 100644
index 0000000..fbebc66
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+ @Override
+ public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ RuntimeContext rCtx = new RuntimeContext(ncAppCtx);
+ ncAppCtx.setApplicationObject(rCtx);
+ }
+
+ @Override
+ public void notifyStartupComplete() throws Exception {
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
deleted file mode 100644
index 76c725e..0000000
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.application.INCBootstrap;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class NCBootstrapImpl implements INCBootstrap {
- private static final Logger LOGGER = Logger.getLogger(NCBootstrapImpl.class.getName());
- private INCApplicationContext appCtx;
-
- @Override
- public void start() throws Exception {
- LOGGER.info("Starting NC Bootstrap");
- RuntimeContext rCtx = new RuntimeContext(appCtx);
- appCtx.setApplicationObject(rCtx);
- LOGGER.info("Initialized RuntimeContext: " + rCtx);
- }
-
- @Override
- public void stop() throws Exception {
- LOGGER.info("Stopping NC Bootstrap");
- RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
- rCtx.close();
- }
-
- @Override
- public void setApplicationContext(INCApplicationContext appCtx) {
- this.appCtx = appCtx;
- }
-}
\ No newline at end of file